HBASE-26730 Extend hbase shell 'status' command to support an option 'tasks' (#4094)
Signed-off-by: Geoffrey Jacoby <gjacoby@apache.org> Signed-off-by: Viraj Jasani <vjasani@apache.org>
This commit is contained in:
parent
55a83ac1af
commit
3e84e50107
|
@ -161,6 +161,12 @@ public interface ClusterMetrics {
|
|||
*/
|
||||
Map<TableName, RegionStatesCount> getTableRegionStatesCount();
|
||||
|
||||
/**
|
||||
* Provide the list of master tasks
|
||||
*/
|
||||
@Nullable
|
||||
List<ServerTask> getMasterTasks();
|
||||
|
||||
/**
|
||||
* Kinds of ClusterMetrics
|
||||
*/
|
||||
|
@ -213,5 +219,9 @@ public interface ClusterMetrics {
|
|||
* metrics about table to no of regions status count
|
||||
*/
|
||||
TABLE_TO_REGIONS_COUNT,
|
||||
/**
|
||||
* metrics about monitored tasks
|
||||
*/
|
||||
TASKS,
|
||||
}
|
||||
}
|
||||
|
|
|
@ -83,6 +83,10 @@ public final class ClusterMetricsBuilder {
|
|||
if (metrics.getMasterName() != null) {
|
||||
builder.setMaster(ProtobufUtil.toServerName((metrics.getMasterName())));
|
||||
}
|
||||
if (metrics.getMasterTasks() != null) {
|
||||
builder.addAllMasterTasks(metrics.getMasterTasks().stream()
|
||||
.map(t -> ProtobufUtil.toServerTask(t)).collect(Collectors.toList()));
|
||||
}
|
||||
if (metrics.getBalancerOn() != null) {
|
||||
builder.setBalancerOn(metrics.getBalancerOn());
|
||||
}
|
||||
|
@ -122,7 +126,9 @@ public final class ClusterMetricsBuilder {
|
|||
proto.getTableRegionStatesCountList().stream()
|
||||
.collect(Collectors.toMap(
|
||||
e -> ProtobufUtil.toTableName(e.getTableName()),
|
||||
e -> ProtobufUtil.toTableRegionStatesCount(e.getRegionStatesCount()))));
|
||||
e -> ProtobufUtil.toTableRegionStatesCount(e.getRegionStatesCount()))))
|
||||
.setMasterTasks(proto.getMasterTasksList().stream()
|
||||
.map(t -> ProtobufUtil.getServerTask(t)).collect(Collectors.toList()));
|
||||
if (proto.hasClusterId()) {
|
||||
builder.setClusterId(ClusterId.convert(proto.getClusterId()).toString());
|
||||
}
|
||||
|
@ -164,6 +170,7 @@ public final class ClusterMetricsBuilder {
|
|||
case SERVERS_NAME: return ClusterMetrics.Option.SERVERS_NAME;
|
||||
case MASTER_INFO_PORT: return ClusterMetrics.Option.MASTER_INFO_PORT;
|
||||
case TABLE_TO_REGIONS_COUNT: return ClusterMetrics.Option.TABLE_TO_REGIONS_COUNT;
|
||||
case TASKS: return ClusterMetrics.Option.TASKS;
|
||||
// should not reach here
|
||||
default: throw new IllegalArgumentException("Invalid option: " + option);
|
||||
}
|
||||
|
@ -188,6 +195,7 @@ public final class ClusterMetricsBuilder {
|
|||
case SERVERS_NAME: return Option.SERVERS_NAME;
|
||||
case MASTER_INFO_PORT: return ClusterStatusProtos.Option.MASTER_INFO_PORT;
|
||||
case TABLE_TO_REGIONS_COUNT: return ClusterStatusProtos.Option.TABLE_TO_REGIONS_COUNT;
|
||||
case TASKS: return ClusterStatusProtos.Option.TASKS;
|
||||
// should not reach here
|
||||
default: throw new IllegalArgumentException("Invalid option: " + option);
|
||||
}
|
||||
|
@ -231,6 +239,8 @@ public final class ClusterMetricsBuilder {
|
|||
private int masterInfoPort;
|
||||
private List<ServerName> serversName = Collections.emptyList();
|
||||
private Map<TableName, RegionStatesCount> tableRegionStatesCount = Collections.emptyMap();
|
||||
@Nullable
|
||||
private List<ServerTask> masterTasks;
|
||||
|
||||
private ClusterMetricsBuilder() {
|
||||
}
|
||||
|
@ -280,6 +290,10 @@ public final class ClusterMetricsBuilder {
|
|||
this.serversName = serversName;
|
||||
return this;
|
||||
}
|
||||
public ClusterMetricsBuilder setMasterTasks(List<ServerTask> masterTasks) {
|
||||
this.masterTasks = masterTasks;
|
||||
return this;
|
||||
}
|
||||
|
||||
public ClusterMetricsBuilder setTableRegionStatesCount(
|
||||
Map<TableName, RegionStatesCount> tableRegionStatesCount) {
|
||||
|
@ -300,7 +314,8 @@ public final class ClusterMetricsBuilder {
|
|||
balancerOn,
|
||||
masterInfoPort,
|
||||
serversName,
|
||||
tableRegionStatesCount
|
||||
tableRegionStatesCount,
|
||||
masterTasks
|
||||
);
|
||||
}
|
||||
private static class ClusterMetricsImpl implements ClusterMetrics {
|
||||
|
@ -320,6 +335,7 @@ public final class ClusterMetricsBuilder {
|
|||
private final int masterInfoPort;
|
||||
private final List<ServerName> serversName;
|
||||
private final Map<TableName, RegionStatesCount> tableRegionStatesCount;
|
||||
private final List<ServerTask> masterTasks;
|
||||
|
||||
ClusterMetricsImpl(String hbaseVersion, List<ServerName> deadServerNames,
|
||||
Map<ServerName, ServerMetrics> liveServerMetrics,
|
||||
|
@ -331,7 +347,8 @@ public final class ClusterMetricsBuilder {
|
|||
Boolean balancerOn,
|
||||
int masterInfoPort,
|
||||
List<ServerName> serversName,
|
||||
Map<TableName, RegionStatesCount> tableRegionStatesCount) {
|
||||
Map<TableName, RegionStatesCount> tableRegionStatesCount,
|
||||
List<ServerTask> masterTasks) {
|
||||
this.hbaseVersion = hbaseVersion;
|
||||
this.deadServerNames = Preconditions.checkNotNull(deadServerNames);
|
||||
this.liveServerMetrics = Preconditions.checkNotNull(liveServerMetrics);
|
||||
|
@ -344,6 +361,7 @@ public final class ClusterMetricsBuilder {
|
|||
this.masterInfoPort = masterInfoPort;
|
||||
this.serversName = serversName;
|
||||
this.tableRegionStatesCount = Preconditions.checkNotNull(tableRegionStatesCount);
|
||||
this.masterTasks = masterTasks;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -406,6 +424,11 @@ public final class ClusterMetricsBuilder {
|
|||
return Collections.unmodifiableMap(tableRegionStatesCount);
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<ServerTask> getMasterTasks() {
|
||||
return masterTasks;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
StringBuilder sb = new StringBuilder(1024);
|
||||
|
|
|
@ -356,6 +356,11 @@ public class ClusterStatus implements ClusterMetrics {
|
|||
return metrics.getTableRegionStatesCount();
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<ServerTask> getMasterTasks() {
|
||||
return metrics.getMasterTasks();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
StringBuilder sb = new StringBuilder(1024);
|
||||
|
|
|
@ -431,6 +431,11 @@ public class ServerLoad implements ServerMetrics {
|
|||
return metrics.getLastReportTimestamp();
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<ServerTask> getTasks() {
|
||||
return metrics.getTasks();
|
||||
}
|
||||
|
||||
/**
|
||||
* Originally, this method factored in the effect of requests going to the
|
||||
* server as well. However, this does not interact very well with the current
|
||||
|
|
|
@ -114,4 +114,11 @@ public interface ServerMetrics {
|
|||
*/
|
||||
long getLastReportTimestamp();
|
||||
|
||||
/**
|
||||
* Called directly from clients such as the hbase shell
|
||||
* @return the active monitored tasks
|
||||
*/
|
||||
@Nullable
|
||||
List<ServerTask> getTasks();
|
||||
|
||||
}
|
||||
|
|
|
@ -85,6 +85,8 @@ public final class ServerMetricsBuilder {
|
|||
.setReplicationLoadSink(serverLoadPB.hasReplLoadSink()
|
||||
? ProtobufUtil.toReplicationLoadSink(serverLoadPB.getReplLoadSink())
|
||||
: null)
|
||||
.setTasks(serverLoadPB.getTasksList().stream()
|
||||
.map(ProtobufUtil::getServerTask).collect(Collectors.toList()))
|
||||
.setReportTimestamp(serverLoadPB.getReportEndTime())
|
||||
.setLastReportTimestamp(serverLoadPB.getReportStartTime()).setVersionNumber(versionNumber)
|
||||
.setVersion(version).build();
|
||||
|
@ -103,19 +105,24 @@ public final class ServerMetricsBuilder {
|
|||
.setInfoServerPort(metrics.getInfoServerPort())
|
||||
.setMaxHeapMB((int) metrics.getMaxHeapSize().get(Size.Unit.MEGABYTE))
|
||||
.setUsedHeapMB((int) metrics.getUsedHeapSize().get(Size.Unit.MEGABYTE))
|
||||
.addAllCoprocessors(toCoprocessor(metrics.getCoprocessorNames())).addAllRegionLoads(
|
||||
.addAllCoprocessors(toCoprocessor(metrics.getCoprocessorNames()))
|
||||
.addAllRegionLoads(
|
||||
metrics.getRegionMetrics().values().stream().map(RegionMetricsBuilder::toRegionLoad)
|
||||
.collect(Collectors.toList())).addAllUserLoads(
|
||||
.collect(Collectors.toList()))
|
||||
.addAllUserLoads(
|
||||
metrics.getUserMetrics().values().stream().map(UserMetricsBuilder::toUserMetrics)
|
||||
.collect(Collectors.toList())).addAllReplLoadSource(
|
||||
.collect(Collectors.toList()))
|
||||
.addAllReplLoadSource(
|
||||
metrics.getReplicationLoadSourceList().stream()
|
||||
.map(ProtobufUtil::toReplicationLoadSource).collect(Collectors.toList()))
|
||||
.addAllTasks(
|
||||
metrics.getTasks().stream().map(ProtobufUtil::toServerTask)
|
||||
.collect(Collectors.toList()))
|
||||
.setReportStartTime(metrics.getLastReportTimestamp())
|
||||
.setReportEndTime(metrics.getReportTimestamp());
|
||||
if (metrics.getReplicationLoadSink() != null) {
|
||||
builder.setReplLoadSink(ProtobufUtil.toReplicationLoadSink(metrics.getReplicationLoadSink()));
|
||||
}
|
||||
|
||||
return builder.build();
|
||||
}
|
||||
|
||||
|
@ -139,6 +146,8 @@ public final class ServerMetricsBuilder {
|
|||
private final Set<String> coprocessorNames = new TreeSet<>();
|
||||
private long reportTimestamp = EnvironmentEdgeManager.currentTime();
|
||||
private long lastReportTimestamp = 0;
|
||||
private final List<ServerTask> tasks = new ArrayList<>();
|
||||
|
||||
private ServerMetricsBuilder(ServerName serverName) {
|
||||
this.serverName = serverName;
|
||||
}
|
||||
|
@ -213,6 +222,11 @@ public final class ServerMetricsBuilder {
|
|||
return this;
|
||||
}
|
||||
|
||||
public ServerMetricsBuilder setTasks(List<ServerTask> tasks) {
|
||||
this.tasks.addAll(tasks);
|
||||
return this;
|
||||
}
|
||||
|
||||
public ServerMetrics build() {
|
||||
return new ServerMetricsImpl(
|
||||
serverName,
|
||||
|
@ -229,7 +243,8 @@ public final class ServerMetricsBuilder {
|
|||
coprocessorNames,
|
||||
reportTimestamp,
|
||||
lastReportTimestamp,
|
||||
userMetrics);
|
||||
userMetrics,
|
||||
tasks);
|
||||
}
|
||||
|
||||
private static class ServerMetricsImpl implements ServerMetrics {
|
||||
|
@ -249,12 +264,14 @@ public final class ServerMetricsBuilder {
|
|||
private final long reportTimestamp;
|
||||
private final long lastReportTimestamp;
|
||||
private final Map<byte[], UserMetrics> userMetrics;
|
||||
private final List<ServerTask> tasks;
|
||||
|
||||
ServerMetricsImpl(ServerName serverName, int versionNumber, String version,
|
||||
long requestCountPerSecond, long requestCount, Size usedHeapSize, Size maxHeapSize,
|
||||
int infoServerPort, List<ReplicationLoadSource> sources, ReplicationLoadSink sink,
|
||||
Map<byte[], RegionMetrics> regionStatus, Set<String> coprocessorNames, long reportTimestamp,
|
||||
long lastReportTimestamp, Map<byte[], UserMetrics> userMetrics) {
|
||||
Map<byte[], RegionMetrics> regionStatus, Set<String> coprocessorNames,
|
||||
long reportTimestamp, long lastReportTimestamp, Map<byte[], UserMetrics> userMetrics,
|
||||
List<ServerTask> tasks) {
|
||||
this.serverName = Preconditions.checkNotNull(serverName);
|
||||
this.versionNumber = versionNumber;
|
||||
this.version = version;
|
||||
|
@ -270,6 +287,7 @@ public final class ServerMetricsBuilder {
|
|||
this.coprocessorNames =Preconditions.checkNotNull(coprocessorNames);
|
||||
this.reportTimestamp = reportTimestamp;
|
||||
this.lastReportTimestamp = lastReportTimestamp;
|
||||
this.tasks = tasks;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -356,6 +374,11 @@ public final class ServerMetricsBuilder {
|
|||
return lastReportTimestamp;
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<ServerTask> getTasks() {
|
||||
return tasks;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
int storeCount = 0;
|
||||
|
|
|
@ -0,0 +1,64 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase;
|
||||
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
|
||||
/** Information about active monitored server tasks */
|
||||
@InterfaceAudience.Public
|
||||
public interface ServerTask {
|
||||
|
||||
/** Task state */
|
||||
enum State {
|
||||
RUNNING,
|
||||
WAITING,
|
||||
COMPLETE,
|
||||
ABORTED;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the task's description.
|
||||
* @return the task's description, typically a name
|
||||
*/
|
||||
String getDescription();
|
||||
|
||||
/**
|
||||
* Get the current status of the task.
|
||||
* @return the task's current status
|
||||
*/
|
||||
String getStatus();
|
||||
|
||||
/**
|
||||
* Get the current state of the task.
|
||||
* @return the task's current state
|
||||
*/
|
||||
State getState();
|
||||
|
||||
/**
|
||||
* Get the task start time.
|
||||
* @return the time when the task started, or 0 if it has not started yet
|
||||
*/
|
||||
long getStartTime();
|
||||
|
||||
/**
|
||||
* Get the task completion time.
|
||||
* @return the time when the task completed, or 0 if it has not completed yet
|
||||
*/
|
||||
long getCompletionTime();
|
||||
|
||||
}
|
|
@ -0,0 +1,126 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase;
|
||||
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
|
||||
/** Builder for information about active monitored server tasks */
|
||||
@InterfaceAudience.Private
|
||||
public final class ServerTaskBuilder {
|
||||
|
||||
public static ServerTaskBuilder newBuilder() {
|
||||
return new ServerTaskBuilder();
|
||||
}
|
||||
|
||||
private String description = "";
|
||||
private String status = "";
|
||||
private ServerTask.State state = ServerTask.State.RUNNING;
|
||||
private long startTime;
|
||||
private long completionTime;
|
||||
|
||||
private ServerTaskBuilder() { }
|
||||
|
||||
private static final class ServerTaskImpl implements ServerTask {
|
||||
|
||||
private final String description;
|
||||
private final String status;
|
||||
private final ServerTask.State state;
|
||||
private final long startTime;
|
||||
private final long completionTime;
|
||||
|
||||
private ServerTaskImpl(final String description, final String status,
|
||||
final ServerTask.State state, final long startTime, final long completionTime) {
|
||||
this.description = description;
|
||||
this.status = status;
|
||||
this.state = state;
|
||||
this.startTime = startTime;
|
||||
this.completionTime = completionTime;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getDescription() {
|
||||
return description;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getStatus() {
|
||||
return status;
|
||||
}
|
||||
|
||||
@Override
|
||||
public State getState() {
|
||||
return state;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getStartTime() {
|
||||
return startTime;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getCompletionTime() {
|
||||
return completionTime;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
StringBuilder sb = new StringBuilder(512);
|
||||
sb.append(getDescription());
|
||||
sb.append(": status=");
|
||||
sb.append(getStatus());
|
||||
sb.append(", state=");
|
||||
sb.append(getState());
|
||||
sb.append(", startTime=");
|
||||
sb.append(getStartTime());
|
||||
sb.append(", completionTime=");
|
||||
sb.append(getCompletionTime());
|
||||
return sb.toString();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
public ServerTaskBuilder setDescription(final String description) {
|
||||
this.description = description;
|
||||
return this;
|
||||
}
|
||||
|
||||
public ServerTaskBuilder setStatus(final String status) {
|
||||
this.status = status;
|
||||
return this;
|
||||
}
|
||||
|
||||
public ServerTaskBuilder setState(final ServerTask.State state) {
|
||||
this.state = state;
|
||||
return this;
|
||||
}
|
||||
|
||||
public ServerTaskBuilder setStartTime(final long startTime) {
|
||||
this.startTime = startTime;
|
||||
return this;
|
||||
}
|
||||
|
||||
public ServerTaskBuilder setCompletionTime(final long completionTime) {
|
||||
this.completionTime = completionTime;
|
||||
return this;
|
||||
}
|
||||
|
||||
public ServerTask build() {
|
||||
return new ServerTaskImpl(description, status, state, startTime, completionTime);
|
||||
}
|
||||
|
||||
}
|
|
@ -66,6 +66,8 @@ import org.apache.hadoop.hbase.HRegionLocation;
|
|||
import org.apache.hadoop.hbase.KeyValue;
|
||||
import org.apache.hadoop.hbase.NamespaceDescriptor;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.ServerTask;
|
||||
import org.apache.hadoop.hbase.ServerTaskBuilder;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.client.Append;
|
||||
import org.apache.hadoop.hbase.client.BalanceResponse;
|
||||
|
@ -3792,4 +3794,24 @@ public final class ProtobufUtil {
|
|||
.build();
|
||||
}
|
||||
|
||||
public static ServerTask getServerTask(ClusterStatusProtos.ServerTask task) {
|
||||
return ServerTaskBuilder.newBuilder()
|
||||
.setDescription(task.getDescription())
|
||||
.setStatus(task.getStatus())
|
||||
.setState(ServerTask.State.valueOf(task.getState().name()))
|
||||
.setStartTime(task.getStartTime())
|
||||
.setCompletionTime(task.getCompletionTime())
|
||||
.build();
|
||||
}
|
||||
|
||||
public static ClusterStatusProtos.ServerTask toServerTask(ServerTask task) {
|
||||
return ClusterStatusProtos.ServerTask.newBuilder()
|
||||
.setDescription(task.getDescription())
|
||||
.setStatus(task.getStatus())
|
||||
.setState(ClusterStatusProtos.ServerTask.State.valueOf(task.getState().name()))
|
||||
.setStartTime(task.getStartTime())
|
||||
.setCompletionTime(task.getCompletionTime())
|
||||
.build();
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -229,6 +229,21 @@ message ReplicationLoadSource {
|
|||
optional uint64 oPsShipped = 12;
|
||||
}
|
||||
|
||||
message ServerTask {
|
||||
required string description = 1;
|
||||
required string status = 2;
|
||||
required State state = 3;
|
||||
optional uint64 startTime = 4;
|
||||
optional uint64 completionTime = 5;
|
||||
|
||||
enum State {
|
||||
RUNNING = 0;
|
||||
WAITING = 1;
|
||||
COMPLETE = 2;
|
||||
ABORTED = 3;
|
||||
}
|
||||
}
|
||||
|
||||
message ServerLoad {
|
||||
/** Number of requests since last report. */
|
||||
optional uint64 number_of_requests = 1;
|
||||
|
@ -285,6 +300,11 @@ message ServerLoad {
|
|||
* The metrics for each user on this region server
|
||||
*/
|
||||
repeated UserLoad userLoads = 12;
|
||||
|
||||
/**
|
||||
* The active monitored tasks
|
||||
*/
|
||||
repeated ServerTask tasks = 15; /* 15 here to stay in sync with master branch */
|
||||
}
|
||||
|
||||
message LiveServerInfo {
|
||||
|
@ -318,6 +338,7 @@ message ClusterStatus {
|
|||
optional int32 master_info_port = 10 [default = -1];
|
||||
repeated ServerName servers_name = 11;
|
||||
repeated TableRegionStatesCount table_region_states_count = 12;
|
||||
repeated ServerTask master_tasks = 13;
|
||||
}
|
||||
|
||||
enum Option {
|
||||
|
@ -333,4 +354,5 @@ enum Option {
|
|||
MASTER_INFO_PORT = 9;
|
||||
SERVERS_NAME = 10;
|
||||
TABLE_TO_REGIONS_COUNT = 11;
|
||||
TASKS = 12;
|
||||
}
|
||||
|
|
|
@ -79,6 +79,8 @@ import org.apache.hadoop.hbase.RegionMetrics;
|
|||
import org.apache.hadoop.hbase.ReplicationPeerNotFoundException;
|
||||
import org.apache.hadoop.hbase.ServerMetrics;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.ServerTask;
|
||||
import org.apache.hadoop.hbase.ServerTaskBuilder;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.TableNotDisabledException;
|
||||
import org.apache.hadoop.hbase.TableNotFoundException;
|
||||
|
@ -2712,16 +2714,39 @@ public class HMaster extends HRegionServer implements MasterServices {
|
|||
options = EnumSet.allOf(Option.class);
|
||||
}
|
||||
|
||||
// TASKS and/or LIVE_SERVERS will populate this map, which will be given to the builder if
|
||||
// not null after option processing completes.
|
||||
Map<ServerName, ServerMetrics> serverMetricsMap = null;
|
||||
|
||||
for (Option opt : options) {
|
||||
switch (opt) {
|
||||
case HBASE_VERSION: builder.setHBaseVersion(VersionInfo.getVersion()); break;
|
||||
case CLUSTER_ID: builder.setClusterId(getClusterId()); break;
|
||||
case MASTER: builder.setMasterName(getServerName()); break;
|
||||
case BACKUP_MASTERS: builder.setBackerMasterNames(getBackupMasters()); break;
|
||||
case TASKS: {
|
||||
// Master tasks
|
||||
builder.setMasterTasks(TaskMonitor.get().getTasks().stream()
|
||||
.map(task -> ServerTaskBuilder.newBuilder()
|
||||
.setDescription(task.getDescription())
|
||||
.setStatus(task.getStatus())
|
||||
.setState(ServerTask.State.valueOf(task.getState().name()))
|
||||
.setStartTime(task.getStartTime())
|
||||
.setCompletionTime(task.getCompletionTimestamp())
|
||||
.build())
|
||||
.collect(Collectors.toList()));
|
||||
// TASKS is also synonymous with LIVE_SERVERS for now because task information for
|
||||
// regionservers is carried in ServerLoad.
|
||||
// Add entries to serverMetricsMap for all live servers, if we haven't already done so
|
||||
if (serverMetricsMap == null) {
|
||||
serverMetricsMap = getOnlineServers();
|
||||
}
|
||||
break;
|
||||
}
|
||||
case LIVE_SERVERS: {
|
||||
if (serverManager != null) {
|
||||
builder.setLiveServerMetrics(serverManager.getOnlineServers().entrySet().stream()
|
||||
.collect(Collectors.toMap(e -> e.getKey(), e -> e.getValue())));
|
||||
// Add entries to serverMetricsMap for all live servers, if we haven't already done so
|
||||
if (serverMetricsMap == null) {
|
||||
serverMetricsMap = getOnlineServers();
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
@ -2783,9 +2808,24 @@ public class HMaster extends HRegionServer implements MasterServices {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (serverMetricsMap != null) {
|
||||
builder.setLiveServerMetrics(serverMetricsMap);
|
||||
}
|
||||
|
||||
return builder.build();
|
||||
}
|
||||
|
||||
private Map<ServerName, ServerMetrics> getOnlineServers() {
|
||||
if (serverManager != null) {
|
||||
final Map<ServerName, ServerMetrics> map = new HashMap<>();
|
||||
serverManager.getOnlineServers().entrySet()
|
||||
.forEach(e -> map.put(e.getKey(), e.getValue()));
|
||||
return map;
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return cluster status
|
||||
*/
|
||||
|
|
|
@ -127,6 +127,7 @@ import org.apache.hadoop.hbase.master.HMaster;
|
|||
import org.apache.hadoop.hbase.master.LoadBalancer;
|
||||
import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer;
|
||||
import org.apache.hadoop.hbase.mob.MobFileCache;
|
||||
import org.apache.hadoop.hbase.monitoring.TaskMonitor;
|
||||
import org.apache.hadoop.hbase.namequeues.NamedQueueRecorder;
|
||||
import org.apache.hadoop.hbase.namequeues.SlowLogTableOpsChore;
|
||||
import org.apache.hadoop.hbase.net.Address;
|
||||
|
@ -1461,10 +1462,10 @@ public class HRegionServer extends Thread implements
|
|||
serverLoad.addUserLoads(createUserLoad(entry.getKey(), entry.getValue()));
|
||||
}
|
||||
}
|
||||
|
||||
// for the replicationLoad purpose. Only need to get from one executorService
|
||||
// either source or sink will get the same info
|
||||
ReplicationSourceService rsources = getReplicationSourceService();
|
||||
|
||||
if (rsources != null) {
|
||||
// always refresh first to get the latest value
|
||||
ReplicationLoad rLoad = rsources.refreshAndGetReplicationLoad();
|
||||
|
@ -1478,6 +1479,15 @@ public class HRegionServer extends Thread implements
|
|||
}
|
||||
}
|
||||
|
||||
TaskMonitor.get().getTasks().forEach(task ->
|
||||
serverLoad.addTasks(ClusterStatusProtos.ServerTask.newBuilder()
|
||||
.setDescription(task.getDescription())
|
||||
.setStatus(task.getStatus() != null ? task.getStatus() : "")
|
||||
.setState(ClusterStatusProtos.ServerTask.State.valueOf(task.getState().name()))
|
||||
.setStartTime(task.getStartTime())
|
||||
.setCompletionTime(task.getCompletionTimestamp())
|
||||
.build()));
|
||||
|
||||
return serverLoad.build();
|
||||
}
|
||||
|
||||
|
|
|
@ -48,12 +48,14 @@ import org.apache.hadoop.hbase.coprocessor.MasterObserver;
|
|||
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
|
||||
import org.apache.hadoop.hbase.filter.FilterAllFilter;
|
||||
import org.apache.hadoop.hbase.master.HMaster;
|
||||
import org.apache.hadoop.hbase.monitoring.TaskMonitor;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegionServer;
|
||||
import org.apache.hadoop.hbase.regionserver.MetricsUserAggregateFactory;
|
||||
import org.apache.hadoop.hbase.security.User;
|
||||
import org.apache.hadoop.hbase.security.UserProvider;
|
||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||
import org.apache.hadoop.hbase.util.JVMClusterUtil.MasterThread;
|
||||
import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
|
||||
import org.junit.AfterClass;
|
||||
|
@ -79,6 +81,17 @@ public class TestClientClusterMetrics {
|
|||
private static final TableName TABLE_NAME = TableName.valueOf("test");
|
||||
private static final byte[] CF = Bytes.toBytes("cf");
|
||||
|
||||
// We need to promote the visibility of tryRegionServerReport for this test
|
||||
public static class MyRegionServer extends MiniHBaseCluster.MiniHBaseClusterRegionServer {
|
||||
public MyRegionServer(Configuration conf) throws IOException, InterruptedException {
|
||||
super(conf);
|
||||
}
|
||||
@Override
|
||||
public void tryRegionServerReport(long reportStartTime, long reportEndTime)
|
||||
throws IOException {
|
||||
super.tryRegionServerReport(reportStartTime, reportEndTime);
|
||||
}
|
||||
}
|
||||
|
||||
@BeforeClass
|
||||
public static void setUpBeforeClass() throws Exception {
|
||||
|
@ -86,6 +99,7 @@ public class TestClientClusterMetrics {
|
|||
conf.set(CoprocessorHost.MASTER_COPROCESSOR_CONF_KEY, MyObserver.class.getName());
|
||||
UTIL = new HBaseTestingUtility(conf);
|
||||
StartMiniClusterOption option = StartMiniClusterOption.builder()
|
||||
.rsClass(TestClientClusterMetrics.MyRegionServer.class)
|
||||
.numMasters(MASTERS).numRegionServers(SLAVES).numDataNodes(SLAVES).build();
|
||||
UTIL.startMiniCluster(option);
|
||||
CLUSTER = UTIL.getHBaseCluster();
|
||||
|
@ -250,7 +264,8 @@ public class TestClientClusterMetrics {
|
|||
Assert.assertEquals(MASTERS - 1, metrics.getBackupMasterNames().size());
|
||||
}
|
||||
|
||||
@Test public void testUserMetrics() throws Exception {
|
||||
@Test
|
||||
public void testUserMetrics() throws Exception {
|
||||
Configuration conf = UTIL.getConfiguration();
|
||||
// If metrics for users is not enabled, this test doesn't make sense.
|
||||
if (!conf.getBoolean(MetricsUserAggregateFactory.METRIC_USER_ENABLED_CONF,
|
||||
|
@ -338,6 +353,48 @@ public class TestClientClusterMetrics {
|
|||
UTIL.deleteTable(TABLE_NAME);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testServerTasks() throws Exception {
|
||||
// TaskMonitor is a singleton per VM, so will be shared among all minicluster "servers",
|
||||
// so we only need to look at the first live server's results to find it.
|
||||
final String testTaskName = "TEST TASK";
|
||||
TaskMonitor.get().createStatus(testTaskName).setStatus("Testing 1... 2... 3...");
|
||||
// Of course, first we must trigger regionserver reports.
|
||||
final long now = EnvironmentEdgeManager.currentTime();
|
||||
final long last = now - 1000; // fake a period, or someone might div by zero
|
||||
for (RegionServerThread rs: CLUSTER.getRegionServerThreads()) {
|
||||
((MyRegionServer)rs.getRegionServer()).tryRegionServerReport(last, now);
|
||||
}
|
||||
// Get status now
|
||||
ClusterMetrics clusterMetrics = ADMIN.getClusterMetrics(EnumSet.of(Option.TASKS));
|
||||
// The test task will be in the master metrics list
|
||||
boolean found = false;
|
||||
for (ServerTask task: clusterMetrics.getMasterTasks()) {
|
||||
if (testTaskName.equals(task.getDescription())) {
|
||||
// Found it
|
||||
found = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
Assert.assertTrue("Expected task not found in master task list", found);
|
||||
// Get the tasks information (carried in server metrics)
|
||||
found = false;
|
||||
for (ServerMetrics serverMetrics: clusterMetrics.getLiveServerMetrics().values()) {
|
||||
if (serverMetrics.getTasks() != null) {
|
||||
for (ServerTask task: serverMetrics.getTasks()) {
|
||||
if (testTaskName.equals(task.getDescription())) {
|
||||
// Found it
|
||||
found = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
// We will fall through here if getClusterMetrics(TASKS) did not correctly process the
|
||||
// task list.
|
||||
Assert.assertTrue("Expected task not found in server load", found);
|
||||
}
|
||||
|
||||
private RegionMetrics getMetaMetrics() throws IOException {
|
||||
for (ServerMetrics serverMetrics : ADMIN.getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS))
|
||||
.getLiveServerMetrics().values()) {
|
||||
|
|
|
@ -33,6 +33,7 @@ import org.apache.hadoop.hbase.HBaseTestingUtility;
|
|||
import org.apache.hadoop.hbase.RegionMetrics;
|
||||
import org.apache.hadoop.hbase.ServerMetrics;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.ServerTask;
|
||||
import org.apache.hadoop.hbase.Size;
|
||||
import org.apache.hadoop.hbase.Stoppable;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
|
@ -293,6 +294,11 @@ public class TestRegionsRecoveryChore {
|
|||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<ServerTask> getMasterTasks() {
|
||||
return null;
|
||||
}
|
||||
|
||||
};
|
||||
return clusterMetrics;
|
||||
}
|
||||
|
@ -377,6 +383,11 @@ public class TestRegionsRecoveryChore {
|
|||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<ServerTask> getTasks() {
|
||||
return null;
|
||||
}
|
||||
|
||||
};
|
||||
return serverMetrics;
|
||||
}
|
||||
|
|
|
@ -850,12 +850,16 @@ module Hbase
|
|||
puts(format(' %s', v))
|
||||
end
|
||||
master = status.getMaster
|
||||
puts(format('active master: %s:%d %d', master.getHostname, master.getPort, master.getStartcode))
|
||||
unless master.nil?
|
||||
puts(format('active master: %s:%d %d', master.getHostname, master.getPort, master.getStartcode))
|
||||
for task in status.getMasterTasks
|
||||
puts(format(' %s', task.toString))
|
||||
end
|
||||
end
|
||||
puts(format('%d backup masters', status.getBackupMastersSize))
|
||||
for server in status.getBackupMasters
|
||||
puts(format(' %s:%d %d', server.getHostname, server.getPort, server.getStartcode))
|
||||
end
|
||||
|
||||
master_coprocs = java.util.Arrays.toString(@admin.getMasterCoprocessors)
|
||||
unless master_coprocs.nil?
|
||||
puts(format('master coprocessors: %s', master_coprocs))
|
||||
|
@ -868,6 +872,9 @@ module Hbase
|
|||
puts(format(' %s', region.getNameAsString.dump))
|
||||
puts(format(' %s', region.toString))
|
||||
end
|
||||
for task in status.getLoad(server).getTasks
|
||||
puts(format(' %s', task.toString))
|
||||
end
|
||||
end
|
||||
puts(format('%d dead servers', status.getDeadServersSize))
|
||||
for server in status.getDeadServerNames
|
||||
|
@ -906,6 +913,33 @@ module Hbase
|
|||
puts(format('%<sink>s', sink: r_sink_string))
|
||||
end
|
||||
end
|
||||
elsif format == 'tasks'
|
||||
master = status.getMaster
|
||||
unless master.nil?
|
||||
puts(format('active master: %s:%d %d', master.getHostname, master.getPort, master.getStartcode))
|
||||
printed = false
|
||||
for task in status.getMasterTasks
|
||||
next unless task.getState.name == 'RUNNING'
|
||||
puts(format(' %s', task.toString))
|
||||
printed = true
|
||||
end
|
||||
if !printed
|
||||
puts(' no active tasks')
|
||||
end
|
||||
end
|
||||
puts(format('%d live servers', status.getServersSize))
|
||||
for server in status.getServers
|
||||
puts(format(' %s:%d %d', server.getHostname, server.getPort, server.getStartcode))
|
||||
printed = false
|
||||
for task in status.getLoad(server).getTasks
|
||||
next unless task.getState.name == 'RUNNING'
|
||||
puts(format(' %s', task.toString))
|
||||
printed = true
|
||||
end
|
||||
if !printed
|
||||
puts(' no active tasks')
|
||||
end
|
||||
end
|
||||
elsif format == 'simple'
|
||||
load = 0
|
||||
regions = 0
|
||||
|
|
|
@ -22,13 +22,14 @@ module Shell
|
|||
class Status < Command
|
||||
def help
|
||||
<<-EOF
|
||||
Show cluster status. Can be 'summary', 'simple', 'detailed', or 'replication'. The
|
||||
Show cluster status. Can be 'summary', 'simple', 'detailed', 'tasks', or 'replication'. The
|
||||
default is 'summary'. Examples:
|
||||
|
||||
hbase> status
|
||||
hbase> status 'simple'
|
||||
hbase> status 'summary'
|
||||
hbase> status 'simple'
|
||||
hbase> status 'detailed'
|
||||
hbase> status 'tasks'
|
||||
hbase> status 'replication'
|
||||
hbase> status 'replication', 'source'
|
||||
hbase> status 'replication', 'sink'
|
||||
|
|
Loading…
Reference in New Issue