HBASE-23065 [hbtop] Top-N heavy hitter user and client drill downs
Signed-off-by: Toshihiro Suzuki <brfrn169@gmail.com> Signed-off-by: Josh Elser <elserj@apache.org> Signed-off-by: Andrew Purtell <apurtell@apache.org>
This commit is contained in:
parent
2e7a7cadd5
commit
6e6c7b3c2d
|
@ -412,6 +412,10 @@ public class ServerLoad implements ServerMetrics {
|
||||||
return metrics.getRegionMetrics();
|
return metrics.getRegionMetrics();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override public Map<byte[], UserMetrics> getUserMetrics() {
|
||||||
|
return metrics.getUserMetrics();
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Set<String> getCoprocessorNames() {
|
public Set<String> getCoprocessorNames() {
|
||||||
return metrics.getCoprocessorNames();
|
return metrics.getCoprocessorNames();
|
||||||
|
|
|
@ -93,6 +93,11 @@ public interface ServerMetrics {
|
||||||
*/
|
*/
|
||||||
Map<byte[], RegionMetrics> getRegionMetrics();
|
Map<byte[], RegionMetrics> getRegionMetrics();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return metrics per user
|
||||||
|
*/
|
||||||
|
Map<byte[], UserMetrics> getUserMetrics();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Return the RegionServer-level and Region-level coprocessors
|
* Return the RegionServer-level and Region-level coprocessors
|
||||||
* @return string set of loaded RegionServer-level and Region-level coprocessors
|
* @return string set of loaded RegionServer-level and Region-level coprocessors
|
||||||
|
|
|
@ -77,6 +77,8 @@ public final class ServerMetricsBuilder {
|
||||||
.map(HBaseProtos.Coprocessor::getName).collect(Collectors.toList()))
|
.map(HBaseProtos.Coprocessor::getName).collect(Collectors.toList()))
|
||||||
.setRegionMetrics(serverLoadPB.getRegionLoadsList().stream()
|
.setRegionMetrics(serverLoadPB.getRegionLoadsList().stream()
|
||||||
.map(RegionMetricsBuilder::toRegionMetrics).collect(Collectors.toList()))
|
.map(RegionMetricsBuilder::toRegionMetrics).collect(Collectors.toList()))
|
||||||
|
.setUserMetrics(serverLoadPB.getUserLoadsList().stream()
|
||||||
|
.map(UserMetricsBuilder::toUserMetrics).collect(Collectors.toList()))
|
||||||
.setReplicationLoadSources(serverLoadPB.getReplLoadSourceList().stream()
|
.setReplicationLoadSources(serverLoadPB.getReplLoadSourceList().stream()
|
||||||
.map(ProtobufUtil::toReplicationLoadSource).collect(Collectors.toList()))
|
.map(ProtobufUtil::toReplicationLoadSource).collect(Collectors.toList()))
|
||||||
.setReplicationLoadSink(serverLoadPB.hasReplLoadSink()
|
.setReplicationLoadSink(serverLoadPB.hasReplLoadSink()
|
||||||
|
@ -100,19 +102,19 @@ public final class ServerMetricsBuilder {
|
||||||
.setInfoServerPort(metrics.getInfoServerPort())
|
.setInfoServerPort(metrics.getInfoServerPort())
|
||||||
.setMaxHeapMB((int) metrics.getMaxHeapSize().get(Size.Unit.MEGABYTE))
|
.setMaxHeapMB((int) metrics.getMaxHeapSize().get(Size.Unit.MEGABYTE))
|
||||||
.setUsedHeapMB((int) metrics.getUsedHeapSize().get(Size.Unit.MEGABYTE))
|
.setUsedHeapMB((int) metrics.getUsedHeapSize().get(Size.Unit.MEGABYTE))
|
||||||
.addAllCoprocessors(toCoprocessor(metrics.getCoprocessorNames()))
|
.addAllCoprocessors(toCoprocessor(metrics.getCoprocessorNames())).addAllRegionLoads(
|
||||||
.addAllRegionLoads(metrics.getRegionMetrics().values().stream()
|
metrics.getRegionMetrics().values().stream().map(RegionMetricsBuilder::toRegionLoad)
|
||||||
.map(RegionMetricsBuilder::toRegionLoad)
|
.collect(Collectors.toList())).addAllUserLoads(
|
||||||
.collect(Collectors.toList()))
|
metrics.getUserMetrics().values().stream().map(UserMetricsBuilder::toUserMetrics)
|
||||||
.addAllReplLoadSource(metrics.getReplicationLoadSourceList().stream()
|
.collect(Collectors.toList())).addAllReplLoadSource(
|
||||||
.map(ProtobufUtil::toReplicationLoadSource)
|
metrics.getReplicationLoadSourceList().stream()
|
||||||
.collect(Collectors.toList()))
|
.map(ProtobufUtil::toReplicationLoadSource).collect(Collectors.toList()))
|
||||||
.setReportStartTime(metrics.getLastReportTimestamp())
|
.setReportStartTime(metrics.getLastReportTimestamp())
|
||||||
.setReportEndTime(metrics.getReportTimestamp());
|
.setReportEndTime(metrics.getReportTimestamp());
|
||||||
if (metrics.getReplicationLoadSink() != null) {
|
if (metrics.getReplicationLoadSink() != null) {
|
||||||
builder.setReplLoadSink(ProtobufUtil.toReplicationLoadSink(
|
builder.setReplLoadSink(ProtobufUtil.toReplicationLoadSink(metrics.getReplicationLoadSink()));
|
||||||
metrics.getReplicationLoadSink()));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return builder.build();
|
return builder.build();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -132,6 +134,7 @@ public final class ServerMetricsBuilder {
|
||||||
@Nullable
|
@Nullable
|
||||||
private ReplicationLoadSink sink = null;
|
private ReplicationLoadSink sink = null;
|
||||||
private final Map<byte[], RegionMetrics> regionStatus = new TreeMap<>(Bytes.BYTES_COMPARATOR);
|
private final Map<byte[], RegionMetrics> regionStatus = new TreeMap<>(Bytes.BYTES_COMPARATOR);
|
||||||
|
private final Map<byte[], UserMetrics> userMetrics = new TreeMap<>(Bytes.BYTES_COMPARATOR);
|
||||||
private final Set<String> coprocessorNames = new TreeSet<>();
|
private final Set<String> coprocessorNames = new TreeSet<>();
|
||||||
private long reportTimestamp = System.currentTimeMillis();
|
private long reportTimestamp = System.currentTimeMillis();
|
||||||
private long lastReportTimestamp = 0;
|
private long lastReportTimestamp = 0;
|
||||||
|
@ -189,6 +192,11 @@ public final class ServerMetricsBuilder {
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public ServerMetricsBuilder setUserMetrics(List<UserMetrics> value) {
|
||||||
|
value.forEach(v -> this.userMetrics.put(v.getUserName(), v));
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
public ServerMetricsBuilder setCoprocessorNames(List<String> value) {
|
public ServerMetricsBuilder setCoprocessorNames(List<String> value) {
|
||||||
coprocessorNames.addAll(value);
|
coprocessorNames.addAll(value);
|
||||||
return this;
|
return this;
|
||||||
|
@ -219,7 +227,8 @@ public final class ServerMetricsBuilder {
|
||||||
regionStatus,
|
regionStatus,
|
||||||
coprocessorNames,
|
coprocessorNames,
|
||||||
reportTimestamp,
|
reportTimestamp,
|
||||||
lastReportTimestamp);
|
lastReportTimestamp,
|
||||||
|
userMetrics);
|
||||||
}
|
}
|
||||||
|
|
||||||
private static class ServerMetricsImpl implements ServerMetrics {
|
private static class ServerMetricsImpl implements ServerMetrics {
|
||||||
|
@ -238,12 +247,13 @@ public final class ServerMetricsBuilder {
|
||||||
private final Set<String> coprocessorNames;
|
private final Set<String> coprocessorNames;
|
||||||
private final long reportTimestamp;
|
private final long reportTimestamp;
|
||||||
private final long lastReportTimestamp;
|
private final long lastReportTimestamp;
|
||||||
|
private final Map<byte[], UserMetrics> userMetrics;
|
||||||
|
|
||||||
ServerMetricsImpl(ServerName serverName, int versionNumber, String version,
|
ServerMetricsImpl(ServerName serverName, int versionNumber, String version,
|
||||||
long requestCountPerSecond, long requestCount, Size usedHeapSize, Size maxHeapSize,
|
long requestCountPerSecond, long requestCount, Size usedHeapSize, Size maxHeapSize,
|
||||||
int infoServerPort, List<ReplicationLoadSource> sources, ReplicationLoadSink sink,
|
int infoServerPort, List<ReplicationLoadSource> sources, ReplicationLoadSink sink,
|
||||||
Map<byte[], RegionMetrics> regionStatus, Set<String> coprocessorNames, long reportTimestamp,
|
Map<byte[], RegionMetrics> regionStatus, Set<String> coprocessorNames, long reportTimestamp,
|
||||||
long lastReportTimestamp) {
|
long lastReportTimestamp, Map<byte[], UserMetrics> userMetrics) {
|
||||||
this.serverName = Preconditions.checkNotNull(serverName);
|
this.serverName = Preconditions.checkNotNull(serverName);
|
||||||
this.versionNumber = versionNumber;
|
this.versionNumber = versionNumber;
|
||||||
this.version = version;
|
this.version = version;
|
||||||
|
@ -255,6 +265,7 @@ public final class ServerMetricsBuilder {
|
||||||
this.sources = Preconditions.checkNotNull(sources);
|
this.sources = Preconditions.checkNotNull(sources);
|
||||||
this.sink = sink;
|
this.sink = sink;
|
||||||
this.regionStatus = Preconditions.checkNotNull(regionStatus);
|
this.regionStatus = Preconditions.checkNotNull(regionStatus);
|
||||||
|
this.userMetrics = Preconditions.checkNotNull(userMetrics);
|
||||||
this.coprocessorNames =Preconditions.checkNotNull(coprocessorNames);
|
this.coprocessorNames =Preconditions.checkNotNull(coprocessorNames);
|
||||||
this.reportTimestamp = reportTimestamp;
|
this.reportTimestamp = reportTimestamp;
|
||||||
this.lastReportTimestamp = lastReportTimestamp;
|
this.lastReportTimestamp = lastReportTimestamp;
|
||||||
|
@ -324,6 +335,11 @@ public final class ServerMetricsBuilder {
|
||||||
return Collections.unmodifiableMap(regionStatus);
|
return Collections.unmodifiableMap(regionStatus);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Map<byte[], UserMetrics> getUserMetrics() {
|
||||||
|
return Collections.unmodifiableMap(userMetrics);
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Set<String> getCoprocessorNames() {
|
public Set<String> getCoprocessorNames() {
|
||||||
return Collections.unmodifiableSet(coprocessorNames);
|
return Collections.unmodifiableSet(coprocessorNames);
|
||||||
|
|
|
@ -0,0 +1,86 @@
|
||||||
|
/**
|
||||||
|
* Copyright The Apache Software Foundation
|
||||||
|
*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.hbase;
|
||||||
|
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
|
import org.apache.yetus.audience.InterfaceStability;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Encapsulates per-user load metrics.
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Public
|
||||||
|
@InterfaceStability.Evolving
|
||||||
|
public interface UserMetrics {
|
||||||
|
|
||||||
|
interface ClientMetrics {
|
||||||
|
|
||||||
|
String getHostName();
|
||||||
|
|
||||||
|
long getReadRequestsCount();
|
||||||
|
|
||||||
|
long getWriteRequestsCount();
|
||||||
|
|
||||||
|
long getFilteredReadRequestsCount();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return the user name
|
||||||
|
*/
|
||||||
|
byte[] getUserName();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return the number of read requests made by user
|
||||||
|
*/
|
||||||
|
long getReadRequestCount();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return the number of write requests made by user
|
||||||
|
*/
|
||||||
|
long getWriteRequestCount();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return the number of write requests and read requests and coprocessor
|
||||||
|
* service requests made by the user
|
||||||
|
*/
|
||||||
|
default long getRequestCount() {
|
||||||
|
return getReadRequestCount() + getWriteRequestCount();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return the user name as a string
|
||||||
|
*/
|
||||||
|
default String getNameAsString() {
|
||||||
|
return Bytes.toStringBinary(getUserName());
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return metrics per client(hostname)
|
||||||
|
*/
|
||||||
|
Map<String, ClientMetrics> getClientMetrics();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return count of filtered read requests for a user
|
||||||
|
*/
|
||||||
|
long getFilteredReadRequests();
|
||||||
|
}
|
|
@ -0,0 +1,151 @@
|
||||||
|
/**
|
||||||
|
* Copyright The Apache Software Foundation
|
||||||
|
*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.hbase;
|
||||||
|
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
import org.apache.hadoop.hbase.util.Strings;
|
||||||
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
|
|
||||||
|
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
|
||||||
|
|
||||||
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos;
|
||||||
|
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
public final class UserMetricsBuilder {
|
||||||
|
|
||||||
|
public static UserMetrics toUserMetrics(ClusterStatusProtos.UserLoad userLoad) {
|
||||||
|
UserMetricsBuilder builder = UserMetricsBuilder.newBuilder(userLoad.getUserName().getBytes());
|
||||||
|
userLoad.getClientMetricsList().stream().map(
|
||||||
|
clientMetrics -> new ClientMetricsImpl(clientMetrics.getHostName(),
|
||||||
|
clientMetrics.getReadRequestsCount(), clientMetrics.getWriteRequestsCount(),
|
||||||
|
clientMetrics.getFilteredRequestsCount())).forEach(builder::addClientMetris);
|
||||||
|
return builder.build();
|
||||||
|
}
|
||||||
|
|
||||||
|
public static ClusterStatusProtos.UserLoad toUserMetrics(UserMetrics userMetrics) {
|
||||||
|
ClusterStatusProtos.UserLoad.Builder builder =
|
||||||
|
ClusterStatusProtos.UserLoad.newBuilder().setUserName(userMetrics.getNameAsString());
|
||||||
|
userMetrics.getClientMetrics().values().stream().map(
|
||||||
|
clientMetrics -> ClusterStatusProtos.ClientMetrics.newBuilder()
|
||||||
|
.setHostName(clientMetrics.getHostName())
|
||||||
|
.setWriteRequestsCount(clientMetrics.getWriteRequestsCount())
|
||||||
|
.setReadRequestsCount(clientMetrics.getReadRequestsCount())
|
||||||
|
.setFilteredRequestsCount(clientMetrics.getFilteredReadRequestsCount()).build())
|
||||||
|
.forEach(builder::addClientMetrics);
|
||||||
|
return builder.build();
|
||||||
|
}
|
||||||
|
|
||||||
|
public static UserMetricsBuilder newBuilder(byte[] name) {
|
||||||
|
return new UserMetricsBuilder(name);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
private final byte[] name;
|
||||||
|
private Map<String, UserMetrics.ClientMetrics> clientMetricsMap = new HashMap<>();
|
||||||
|
private UserMetricsBuilder(byte[] name) {
|
||||||
|
this.name = name;
|
||||||
|
}
|
||||||
|
|
||||||
|
public UserMetricsBuilder addClientMetris(UserMetrics.ClientMetrics clientMetrics) {
|
||||||
|
clientMetricsMap.put(clientMetrics.getHostName(), clientMetrics);
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public UserMetrics build() {
|
||||||
|
return new UserMetricsImpl(name, clientMetricsMap);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static class ClientMetricsImpl implements UserMetrics.ClientMetrics {
|
||||||
|
private final long filteredReadRequestsCount;
|
||||||
|
private final String hostName;
|
||||||
|
private final long readRequestCount;
|
||||||
|
private final long writeRequestCount;
|
||||||
|
|
||||||
|
public ClientMetricsImpl(String hostName, long readRequest, long writeRequest,
|
||||||
|
long filteredReadRequestsCount) {
|
||||||
|
this.hostName = hostName;
|
||||||
|
this.readRequestCount = readRequest;
|
||||||
|
this.writeRequestCount = writeRequest;
|
||||||
|
this.filteredReadRequestsCount = filteredReadRequestsCount;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override public String getHostName() {
|
||||||
|
return hostName;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override public long getReadRequestsCount() {
|
||||||
|
return readRequestCount;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override public long getWriteRequestsCount() {
|
||||||
|
return writeRequestCount;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override public long getFilteredReadRequestsCount() {
|
||||||
|
return filteredReadRequestsCount;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static class UserMetricsImpl implements UserMetrics {
|
||||||
|
private final byte[] name;
|
||||||
|
private final Map<String, ClientMetrics> clientMetricsMap;
|
||||||
|
|
||||||
|
UserMetricsImpl(byte[] name, Map<String, ClientMetrics> clientMetricsMap) {
|
||||||
|
this.name = Preconditions.checkNotNull(name);
|
||||||
|
this.clientMetricsMap = clientMetricsMap;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override public byte[] getUserName() {
|
||||||
|
return name;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override public long getReadRequestCount() {
|
||||||
|
return clientMetricsMap.values().stream().map(c -> c.getReadRequestsCount())
|
||||||
|
.reduce(0L, Long::sum);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override public long getWriteRequestCount() {
|
||||||
|
return clientMetricsMap.values().stream().map(c -> c.getWriteRequestsCount())
|
||||||
|
.reduce(0L, Long::sum);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override public Map<String, ClientMetrics> getClientMetrics() {
|
||||||
|
return this.clientMetricsMap;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override public long getFilteredReadRequests() {
|
||||||
|
return clientMetricsMap.values().stream().map(c -> c.getFilteredReadRequestsCount())
|
||||||
|
.reduce(0L, Long::sum);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toString() {
|
||||||
|
StringBuilder sb = Strings
|
||||||
|
.appendKeyValue(new StringBuilder(), "readRequestCount", this.getReadRequestCount());
|
||||||
|
Strings.appendKeyValue(sb, "writeRequestCount", this.getWriteRequestCount());
|
||||||
|
Strings.appendKeyValue(sb, "filteredReadRequestCount", this.getFilteredReadRequests());
|
||||||
|
return sb.toString();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -18,6 +18,8 @@
|
||||||
|
|
||||||
package org.apache.hadoop.hbase.regionserver;
|
package org.apache.hadoop.hbase.regionserver;
|
||||||
|
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
import org.apache.hadoop.hbase.metrics.BaseSource;
|
import org.apache.hadoop.hbase.metrics.BaseSource;
|
||||||
import org.apache.yetus.audience.InterfaceAudience;
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
|
|
||||||
|
@ -59,4 +61,6 @@ public interface MetricsUserAggregateSource extends BaseSource {
|
||||||
MetricsUserSource getOrCreateMetricsUser(String user);
|
MetricsUserSource getOrCreateMetricsUser(String user);
|
||||||
|
|
||||||
void deregister(MetricsUserSource toRemove);
|
void deregister(MetricsUserSource toRemove);
|
||||||
|
|
||||||
|
Map<String, MetricsUserSource> getUserSources();
|
||||||
}
|
}
|
||||||
|
|
|
@ -18,11 +18,31 @@
|
||||||
|
|
||||||
package org.apache.hadoop.hbase.regionserver;
|
package org.apache.hadoop.hbase.regionserver;
|
||||||
|
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
import org.apache.hadoop.metrics2.MetricsCollector;
|
||||||
import org.apache.yetus.audience.InterfaceAudience;
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
|
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
public interface MetricsUserSource extends Comparable<MetricsUserSource> {
|
public interface MetricsUserSource extends Comparable<MetricsUserSource> {
|
||||||
|
|
||||||
|
//These client metrics will be reported through clusterStatus and hbtop only
|
||||||
|
interface ClientMetrics {
|
||||||
|
void incrementReadRequest();
|
||||||
|
|
||||||
|
void incrementWriteRequest();
|
||||||
|
|
||||||
|
String getHostName();
|
||||||
|
|
||||||
|
long getReadRequestsCount();
|
||||||
|
|
||||||
|
long getWriteRequestsCount();
|
||||||
|
|
||||||
|
void incrementFilteredReadRequests();
|
||||||
|
|
||||||
|
long getFilteredReadRequests();
|
||||||
|
}
|
||||||
|
|
||||||
String getUser();
|
String getUser();
|
||||||
|
|
||||||
void register();
|
void register();
|
||||||
|
@ -42,4 +62,21 @@ public interface MetricsUserSource extends Comparable<MetricsUserSource> {
|
||||||
void updateReplay(long t);
|
void updateReplay(long t);
|
||||||
|
|
||||||
void updateScanTime(long t);
|
void updateScanTime(long t);
|
||||||
|
|
||||||
|
void getMetrics(MetricsCollector metricsCollector, boolean all);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Metrics collected at client level for a user(needed for reporting through clusterStatus
|
||||||
|
* and hbtop currently)
|
||||||
|
* @return metrics per hostname
|
||||||
|
*/
|
||||||
|
Map<String, ClientMetrics> getClientMetrics();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create a instance of ClientMetrics if not present otherwise return the previous one
|
||||||
|
*
|
||||||
|
* @param hostName hostname of the client
|
||||||
|
* @return Instance of ClientMetrics
|
||||||
|
*/
|
||||||
|
ClientMetrics getOrCreateMetricsClient(String hostName);
|
||||||
}
|
}
|
||||||
|
|
|
@ -47,4 +47,10 @@ public interface MetricHistogram {
|
||||||
*/
|
*/
|
||||||
void add(long value);
|
void add(long value);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Return the total number of values added to the histogram.
|
||||||
|
* @return the total number of values.
|
||||||
|
*/
|
||||||
|
long getCount();
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -18,6 +18,8 @@
|
||||||
|
|
||||||
package org.apache.hadoop.hbase.regionserver;
|
package org.apache.hadoop.hbase.regionserver;
|
||||||
|
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.Map;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
|
||||||
import org.apache.hadoop.hbase.metrics.BaseSourceImpl;
|
import org.apache.hadoop.hbase.metrics.BaseSourceImpl;
|
||||||
|
@ -28,8 +30,6 @@ import org.apache.yetus.audience.InterfaceAudience;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
|
|
||||||
|
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
public class MetricsUserAggregateSourceImpl extends BaseSourceImpl
|
public class MetricsUserAggregateSourceImpl extends BaseSourceImpl
|
||||||
implements MetricsUserAggregateSource {
|
implements MetricsUserAggregateSource {
|
||||||
|
@ -90,9 +90,9 @@ public class MetricsUserAggregateSourceImpl extends BaseSourceImpl
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@VisibleForTesting
|
@Override
|
||||||
public ConcurrentHashMap<String, MetricsUserSource> getUserSources() {
|
public Map<String, MetricsUserSource> getUserSources() {
|
||||||
return userSources;
|
return Collections.unmodifiableMap(userSources);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -18,9 +18,14 @@
|
||||||
|
|
||||||
package org.apache.hadoop.hbase.regionserver;
|
package org.apache.hadoop.hbase.regionserver;
|
||||||
|
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
import java.util.concurrent.atomic.LongAdder;
|
||||||
|
|
||||||
import org.apache.hadoop.metrics2.MetricHistogram;
|
import org.apache.hadoop.metrics2.MetricHistogram;
|
||||||
|
import org.apache.hadoop.metrics2.MetricsCollector;
|
||||||
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
|
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
|
||||||
import org.apache.hadoop.metrics2.lib.DynamicMetricsRegistry;
|
import org.apache.hadoop.metrics2.lib.DynamicMetricsRegistry;
|
||||||
import org.apache.yetus.audience.InterfaceAudience;
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
|
@ -57,6 +62,48 @@ public class MetricsUserSourceImpl implements MetricsUserSource {
|
||||||
private final MetricsUserAggregateSourceImpl agg;
|
private final MetricsUserAggregateSourceImpl agg;
|
||||||
private final DynamicMetricsRegistry registry;
|
private final DynamicMetricsRegistry registry;
|
||||||
|
|
||||||
|
private ConcurrentHashMap<String, ClientMetrics> clientMetricsMap;
|
||||||
|
|
||||||
|
static class ClientMetricsImpl implements ClientMetrics {
|
||||||
|
private final String hostName;
|
||||||
|
final LongAdder readRequestsCount = new LongAdder();
|
||||||
|
final LongAdder writeRequestsCount = new LongAdder();
|
||||||
|
final LongAdder filteredRequestsCount = new LongAdder();
|
||||||
|
|
||||||
|
public ClientMetricsImpl(String hostName) {
|
||||||
|
this.hostName = hostName;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override public void incrementReadRequest() {
|
||||||
|
readRequestsCount.increment();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override public void incrementWriteRequest() {
|
||||||
|
writeRequestsCount.increment();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override public String getHostName() {
|
||||||
|
return hostName;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override public long getReadRequestsCount() {
|
||||||
|
return readRequestsCount.sum();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override public long getWriteRequestsCount() {
|
||||||
|
return writeRequestsCount.sum();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override public void incrementFilteredReadRequests() {
|
||||||
|
filteredRequestsCount.increment();
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override public long getFilteredReadRequests() {
|
||||||
|
return filteredRequestsCount.sum();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
public MetricsUserSourceImpl(String user, MetricsUserAggregateSourceImpl agg) {
|
public MetricsUserSourceImpl(String user, MetricsUserAggregateSourceImpl agg) {
|
||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
LOG.debug("Creating new MetricsUserSourceImpl for user " + user);
|
LOG.debug("Creating new MetricsUserSourceImpl for user " + user);
|
||||||
|
@ -77,7 +124,7 @@ public class MetricsUserSourceImpl implements MetricsUserSource {
|
||||||
userIncrementKey = userNamePrefix + MetricsRegionServerSource.INCREMENT_KEY;
|
userIncrementKey = userNamePrefix + MetricsRegionServerSource.INCREMENT_KEY;
|
||||||
userAppendKey = userNamePrefix + MetricsRegionServerSource.APPEND_KEY;
|
userAppendKey = userNamePrefix + MetricsRegionServerSource.APPEND_KEY;
|
||||||
userReplayKey = userNamePrefix + MetricsRegionServerSource.REPLAY_KEY;
|
userReplayKey = userNamePrefix + MetricsRegionServerSource.REPLAY_KEY;
|
||||||
|
clientMetricsMap = new ConcurrentHashMap<>();
|
||||||
agg.register(this);
|
agg.register(this);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -204,4 +251,27 @@ public class MetricsUserSourceImpl implements MetricsUserSource {
|
||||||
public void updateScanTime(long t) {
|
public void updateScanTime(long t) {
|
||||||
scanTimeHisto.add(t);
|
scanTimeHisto.add(t);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override public void getMetrics(MetricsCollector metricsCollector, boolean all) {
|
||||||
|
MetricsRecordBuilder mrb = metricsCollector.addRecord(this.userNamePrefix);
|
||||||
|
registry.snapshot(mrb, all);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override public Map<String, ClientMetrics> getClientMetrics() {
|
||||||
|
return Collections.unmodifiableMap(clientMetricsMap);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override public ClientMetrics getOrCreateMetricsClient(String client) {
|
||||||
|
ClientMetrics source = clientMetricsMap.get(client);
|
||||||
|
if (source != null) {
|
||||||
|
return source;
|
||||||
|
}
|
||||||
|
source = new ClientMetricsImpl(client);
|
||||||
|
ClientMetrics prev = clientMetricsMap.putIfAbsent(client, source);
|
||||||
|
if (prev != null) {
|
||||||
|
return prev;
|
||||||
|
}
|
||||||
|
return source;
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -51,6 +51,10 @@ public class MutableHistogram extends MutableMetric implements MetricHistogram {
|
||||||
histogram.update(val);
|
histogram.update(val);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override public long getCount() {
|
||||||
|
return histogram.getCount();
|
||||||
|
}
|
||||||
|
|
||||||
public long getMax() {
|
public long getMax() {
|
||||||
return histogram.getMax();
|
return histogram.getMax();
|
||||||
}
|
}
|
||||||
|
|
|
@ -83,5 +83,9 @@ public abstract class MutableRangeHistogram extends MutableHistogram implements
|
||||||
Interns.info(name + "_" + rangeType + "_" + ranges[ranges.length - 1] + "-inf", desc),
|
Interns.info(name + "_" + rangeType + "_" + ranges[ranges.length - 1] + "-inf", desc),
|
||||||
val - cumNum);
|
val - cumNum);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override public long getCount() {
|
||||||
|
return histogram.getCount();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -143,6 +143,10 @@ public final class RecordFilter {
|
||||||
this.value = Objects.requireNonNull(value);
|
this.value = Objects.requireNonNull(value);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public Field getField() {
|
||||||
|
return field;
|
||||||
|
}
|
||||||
|
|
||||||
public boolean execute(Record record) {
|
public boolean execute(Record record) {
|
||||||
FieldValue fieldValue = record.get(field);
|
FieldValue fieldValue = record.get(field);
|
||||||
if (fieldValue == null) {
|
if (fieldValue == null) {
|
||||||
|
|
|
@ -57,7 +57,11 @@ public enum Field {
|
||||||
FieldValueType.STRING),
|
FieldValueType.STRING),
|
||||||
REGION_COUNT("#REGION", "Region Count", false, false, FieldValueType.INTEGER),
|
REGION_COUNT("#REGION", "Region Count", false, false, FieldValueType.INTEGER),
|
||||||
USED_HEAP_SIZE("UHEAP", "Used Heap Size", false, false, FieldValueType.SIZE),
|
USED_HEAP_SIZE("UHEAP", "Used Heap Size", false, false, FieldValueType.SIZE),
|
||||||
MAX_HEAP_SIZE("MHEAP", "Max Heap Size", false, false, FieldValueType.SIZE);
|
USER("USER", "user Name", true, true, FieldValueType.STRING),
|
||||||
|
MAX_HEAP_SIZE("MHEAP", "Max Heap Size", false, false, FieldValueType.SIZE),
|
||||||
|
CLIENT_COUNT("#CLIENT", "Client Count", false, false, FieldValueType.INTEGER),
|
||||||
|
USER_COUNT("#USER", "User Count", false, false, FieldValueType.INTEGER),
|
||||||
|
CLIENT("CLIENT", "Client Hostname", true, true, FieldValueType.STRING);
|
||||||
|
|
||||||
private final String header;
|
private final String header;
|
||||||
private final String description;
|
private final String description;
|
||||||
|
|
|
@ -0,0 +1,157 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.hbtop.mode;
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.HashSet;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Set;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
|
import org.apache.hadoop.hbase.ClusterMetrics;
|
||||||
|
import org.apache.hadoop.hbase.ServerMetrics;
|
||||||
|
import org.apache.hadoop.hbase.UserMetrics;
|
||||||
|
import org.apache.hadoop.hbase.hbtop.Record;
|
||||||
|
import org.apache.hadoop.hbase.hbtop.RecordFilter;
|
||||||
|
import org.apache.hadoop.hbase.hbtop.field.Field;
|
||||||
|
import org.apache.hadoop.hbase.hbtop.field.FieldInfo;
|
||||||
|
import org.apache.hadoop.hbase.hbtop.field.FieldValue;
|
||||||
|
import org.apache.hadoop.hbase.hbtop.field.FieldValueType;
|
||||||
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Implementation for {@link ModeStrategy} for client Mode.
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Private public final class ClientModeStrategy implements ModeStrategy {
|
||||||
|
|
||||||
|
private final List<FieldInfo> fieldInfos = Arrays
|
||||||
|
.asList(new FieldInfo(Field.CLIENT, 0, true),
|
||||||
|
new FieldInfo(Field.USER_COUNT, 5, true),
|
||||||
|
new FieldInfo(Field.REQUEST_COUNT_PER_SECOND, 10, true),
|
||||||
|
new FieldInfo(Field.READ_REQUEST_COUNT_PER_SECOND, 10, true),
|
||||||
|
new FieldInfo(Field.WRITE_REQUEST_COUNT_PER_SECOND, 10, true),
|
||||||
|
new FieldInfo(Field.FILTERED_READ_REQUEST_COUNT_PER_SECOND, 10, true));
|
||||||
|
private final Map<String, RequestCountPerSecond> requestCountPerSecondMap = new HashMap<>();
|
||||||
|
|
||||||
|
ClientModeStrategy() {
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override public List<FieldInfo> getFieldInfos() {
|
||||||
|
return fieldInfos;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override public Field getDefaultSortField() {
|
||||||
|
return Field.REQUEST_COUNT_PER_SECOND;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override public List<Record> getRecords(ClusterMetrics clusterMetrics,
|
||||||
|
List<RecordFilter> pushDownFilters) {
|
||||||
|
List<Record> records = createRecords(clusterMetrics);
|
||||||
|
return aggregateRecordsAndAddDistinct(
|
||||||
|
ModeStrategyUtils.applyFilterAndGet(records, pushDownFilters), Field.CLIENT, Field.USER,
|
||||||
|
Field.USER_COUNT);
|
||||||
|
}
|
||||||
|
|
||||||
|
List<Record> createRecords(ClusterMetrics clusterMetrics) {
|
||||||
|
List<Record> ret = new ArrayList<>();
|
||||||
|
for (ServerMetrics serverMetrics : clusterMetrics.getLiveServerMetrics().values()) {
|
||||||
|
long lastReportTimestamp = serverMetrics.getLastReportTimestamp();
|
||||||
|
serverMetrics.getUserMetrics().values().forEach(um -> um.getClientMetrics().values().forEach(
|
||||||
|
clientMetrics -> ret.add(
|
||||||
|
createRecord(um.getNameAsString(), clientMetrics, lastReportTimestamp,
|
||||||
|
serverMetrics.getServerName().getServerName()))));
|
||||||
|
}
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Aggregate the records and count the unique values for the given distinctField
|
||||||
|
*
|
||||||
|
* @param records records to be processed
|
||||||
|
* @param groupBy Field on which group by needs to be done
|
||||||
|
* @param distinctField Field whose unique values needs to be counted
|
||||||
|
* @param uniqueCountAssignedTo a target field to which the unique count is assigned to
|
||||||
|
* @return aggregated records
|
||||||
|
*/
|
||||||
|
List<Record> aggregateRecordsAndAddDistinct(List<Record> records, Field groupBy,
|
||||||
|
Field distinctField, Field uniqueCountAssignedTo) {
|
||||||
|
List<Record> result = new ArrayList<>();
|
||||||
|
records.stream().collect(Collectors.groupingBy(r -> r.get(groupBy))).values()
|
||||||
|
.forEach(val -> {
|
||||||
|
Set<FieldValue> distinctValues = new HashSet<>();
|
||||||
|
Map<Field, FieldValue> map = new HashMap<>();
|
||||||
|
for (Record record : val) {
|
||||||
|
for (Map.Entry<Field, FieldValue> field : record.entrySet()) {
|
||||||
|
if (distinctField.equals(field.getKey())) {
|
||||||
|
//We will not be adding the field in the new record whose distinct count is required
|
||||||
|
distinctValues.add(record.get(distinctField));
|
||||||
|
} else {
|
||||||
|
if (field.getKey().getFieldValueType() == FieldValueType.STRING) {
|
||||||
|
map.put(field.getKey(), field.getValue());
|
||||||
|
} else {
|
||||||
|
if (map.get(field.getKey()) == null) {
|
||||||
|
map.put(field.getKey(), field.getValue());
|
||||||
|
} else {
|
||||||
|
map.put(field.getKey(), map.get(field.getKey()).plus(field.getValue()));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// Add unique count field
|
||||||
|
map.put(uniqueCountAssignedTo, uniqueCountAssignedTo.newValue(distinctValues.size()));
|
||||||
|
result.add(Record.ofEntries(map.entrySet().stream()
|
||||||
|
.map(k -> Record.entry(k.getKey(), k.getValue()))));
|
||||||
|
});
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
Record createRecord(String user, UserMetrics.ClientMetrics clientMetrics,
|
||||||
|
long lastReportTimestamp, String server) {
|
||||||
|
Record.Builder builder = Record.builder();
|
||||||
|
String client = clientMetrics.getHostName();
|
||||||
|
builder.put(Field.CLIENT, clientMetrics.getHostName());
|
||||||
|
String mapKey = client + "$" + user + "$" + server;
|
||||||
|
RequestCountPerSecond requestCountPerSecond = requestCountPerSecondMap.get(mapKey);
|
||||||
|
if (requestCountPerSecond == null) {
|
||||||
|
requestCountPerSecond = new RequestCountPerSecond();
|
||||||
|
requestCountPerSecondMap.put(mapKey, requestCountPerSecond);
|
||||||
|
}
|
||||||
|
requestCountPerSecond.refresh(lastReportTimestamp, clientMetrics.getReadRequestsCount(),
|
||||||
|
clientMetrics.getFilteredReadRequestsCount(), clientMetrics.getWriteRequestsCount());
|
||||||
|
builder.put(Field.REQUEST_COUNT_PER_SECOND, requestCountPerSecond.getRequestCountPerSecond());
|
||||||
|
builder.put(Field.READ_REQUEST_COUNT_PER_SECOND,
|
||||||
|
requestCountPerSecond.getReadRequestCountPerSecond());
|
||||||
|
builder.put(Field.WRITE_REQUEST_COUNT_PER_SECOND,
|
||||||
|
requestCountPerSecond.getWriteRequestCountPerSecond());
|
||||||
|
builder.put(Field.FILTERED_READ_REQUEST_COUNT_PER_SECOND,
|
||||||
|
requestCountPerSecond.getFilteredReadRequestCountPerSecond());
|
||||||
|
builder.put(Field.USER, user);
|
||||||
|
return builder.build();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override public DrillDownInfo drillDown(Record selectedRecord) {
|
||||||
|
List<RecordFilter> initialFilters = Collections.singletonList(
|
||||||
|
RecordFilter.newBuilder(Field.CLIENT).doubleEquals(selectedRecord.get(Field.CLIENT)));
|
||||||
|
return new DrillDownInfo(Mode.USER, initialFilters);
|
||||||
|
}
|
||||||
|
}
|
|
@ -22,6 +22,7 @@ import java.util.List;
|
||||||
import java.util.Objects;
|
import java.util.Objects;
|
||||||
import org.apache.hadoop.hbase.ClusterMetrics;
|
import org.apache.hadoop.hbase.ClusterMetrics;
|
||||||
import org.apache.hadoop.hbase.hbtop.Record;
|
import org.apache.hadoop.hbase.hbtop.Record;
|
||||||
|
import org.apache.hadoop.hbase.hbtop.RecordFilter;
|
||||||
import org.apache.hadoop.hbase.hbtop.field.Field;
|
import org.apache.hadoop.hbase.hbtop.field.Field;
|
||||||
import org.apache.hadoop.hbase.hbtop.field.FieldInfo;
|
import org.apache.hadoop.hbase.hbtop.field.FieldInfo;
|
||||||
import org.apache.yetus.audience.InterfaceAudience;
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
|
@ -35,7 +36,9 @@ public enum Mode {
|
||||||
NAMESPACE("Namespace", "Record per Namespace", new NamespaceModeStrategy()),
|
NAMESPACE("Namespace", "Record per Namespace", new NamespaceModeStrategy()),
|
||||||
TABLE("Table", "Record per Table", new TableModeStrategy()),
|
TABLE("Table", "Record per Table", new TableModeStrategy()),
|
||||||
REGION("Region", "Record per Region", new RegionModeStrategy()),
|
REGION("Region", "Record per Region", new RegionModeStrategy()),
|
||||||
REGION_SERVER("RegionServer", "Record per RegionServer", new RegionServerModeStrategy());
|
REGION_SERVER("RegionServer", "Record per RegionServer", new RegionServerModeStrategy()),
|
||||||
|
USER("User", "Record per user", new UserModeStrategy()),
|
||||||
|
CLIENT("Client", "Record per client", new ClientModeStrategy());
|
||||||
|
|
||||||
private final String header;
|
private final String header;
|
||||||
private final String description;
|
private final String description;
|
||||||
|
@ -55,8 +58,9 @@ public enum Mode {
|
||||||
return description;
|
return description;
|
||||||
}
|
}
|
||||||
|
|
||||||
public List<Record> getRecords(ClusterMetrics clusterMetrics) {
|
public List<Record> getRecords(ClusterMetrics clusterMetrics,
|
||||||
return modeStrategy.getRecords(clusterMetrics);
|
List<RecordFilter> pushDownFilters) {
|
||||||
|
return modeStrategy.getRecords(clusterMetrics, pushDownFilters);
|
||||||
}
|
}
|
||||||
|
|
||||||
public List<FieldInfo> getFieldInfos() {
|
public List<FieldInfo> getFieldInfos() {
|
||||||
|
|
|
@ -21,6 +21,7 @@ import edu.umd.cs.findbugs.annotations.Nullable;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import org.apache.hadoop.hbase.ClusterMetrics;
|
import org.apache.hadoop.hbase.ClusterMetrics;
|
||||||
import org.apache.hadoop.hbase.hbtop.Record;
|
import org.apache.hadoop.hbase.hbtop.Record;
|
||||||
|
import org.apache.hadoop.hbase.hbtop.RecordFilter;
|
||||||
import org.apache.hadoop.hbase.hbtop.field.Field;
|
import org.apache.hadoop.hbase.hbtop.field.Field;
|
||||||
import org.apache.hadoop.hbase.hbtop.field.FieldInfo;
|
import org.apache.hadoop.hbase.hbtop.field.FieldInfo;
|
||||||
import org.apache.yetus.audience.InterfaceAudience;
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
|
@ -33,6 +34,6 @@ import org.apache.yetus.audience.InterfaceAudience;
|
||||||
interface ModeStrategy {
|
interface ModeStrategy {
|
||||||
List<FieldInfo> getFieldInfos();
|
List<FieldInfo> getFieldInfos();
|
||||||
Field getDefaultSortField();
|
Field getDefaultSortField();
|
||||||
List<Record> getRecords(ClusterMetrics clusterMetrics);
|
List<Record> getRecords(ClusterMetrics clusterMetrics, List<RecordFilter> pushDownFilters);
|
||||||
@Nullable DrillDownInfo drillDown(Record selectedRecord);
|
@Nullable DrillDownInfo drillDown(Record selectedRecord);
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,63 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.hbtop.mode;
|
||||||
|
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
import java.util.stream.Stream;
|
||||||
|
|
||||||
|
import org.apache.hadoop.hbase.hbtop.Record;
|
||||||
|
import org.apache.hadoop.hbase.hbtop.RecordFilter;
|
||||||
|
import org.apache.hadoop.hbase.hbtop.field.Field;
|
||||||
|
|
||||||
|
public final class ModeStrategyUtils {
|
||||||
|
private ModeStrategyUtils() {
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Filter records as per the supplied filters,
|
||||||
|
* @param records records to be processed
|
||||||
|
* @param filters List of filters
|
||||||
|
* @return filtered records
|
||||||
|
*/
|
||||||
|
public static List<Record> applyFilterAndGet(List<Record> records,
|
||||||
|
List<RecordFilter> filters) {
|
||||||
|
if (filters != null && !filters.isEmpty()) {
|
||||||
|
return records.stream().filter(r -> filters.stream().allMatch(f -> f.execute(r)))
|
||||||
|
.collect(Collectors.toList());
|
||||||
|
}
|
||||||
|
return records;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Group by records on the basis of supplied groupBy field and
|
||||||
|
* Aggregate records using {@link Record#combine(Record)}
|
||||||
|
*
|
||||||
|
* @param records records needs to be processed
|
||||||
|
* @param groupBy Field to be used for group by
|
||||||
|
* @return aggregated records
|
||||||
|
*/
|
||||||
|
public static List<Record> aggregateRecords(List<Record> records, Field groupBy) {
|
||||||
|
return records.stream().collect(Collectors.groupingBy(r -> r.get(groupBy))).entrySet().stream()
|
||||||
|
.flatMap(e -> e.getValue().stream().reduce(Record::combine).map(Stream::of)
|
||||||
|
.orElse(Stream.empty())).collect(Collectors.toList());
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -20,8 +20,7 @@ package org.apache.hadoop.hbase.hbtop.mode;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.stream.Collectors;
|
|
||||||
import java.util.stream.Stream;
|
|
||||||
import org.apache.hadoop.hbase.ClusterMetrics;
|
import org.apache.hadoop.hbase.ClusterMetrics;
|
||||||
import org.apache.hadoop.hbase.hbtop.Record;
|
import org.apache.hadoop.hbase.hbtop.Record;
|
||||||
import org.apache.hadoop.hbase.hbtop.RecordFilter;
|
import org.apache.hadoop.hbase.hbtop.RecordFilter;
|
||||||
|
@ -64,27 +63,14 @@ public final class NamespaceModeStrategy implements ModeStrategy {
|
||||||
return Field.REQUEST_COUNT_PER_SECOND;
|
return Field.REQUEST_COUNT_PER_SECOND;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override public List<Record> getRecords(ClusterMetrics clusterMetrics,
|
||||||
public List<Record> getRecords(ClusterMetrics clusterMetrics) {
|
List<RecordFilter> pushDownFilters) {
|
||||||
// Get records from RegionModeStrategy and add REGION_COUNT field
|
// Get records from RegionModeStrategy and add REGION_COUNT field
|
||||||
List<Record> records = regionModeStrategy.getRecords(clusterMetrics).stream()
|
List<Record> records = regionModeStrategy.selectModeFieldsAndAddCountField(fieldInfos,
|
||||||
.map(record ->
|
regionModeStrategy.getRecords(clusterMetrics, pushDownFilters), Field.REGION_COUNT);
|
||||||
Record.ofEntries(fieldInfos.stream()
|
|
||||||
.filter(fi -> record.containsKey(fi.getField()))
|
|
||||||
.map(fi -> Record.entry(fi.getField(), record.get(fi.getField())))))
|
|
||||||
.map(record -> Record.builder().putAll(record).put(Field.REGION_COUNT, 1).build())
|
|
||||||
.collect(Collectors.toList());
|
|
||||||
|
|
||||||
// Aggregation by NAMESPACE field
|
// Aggregation by NAMESPACE field
|
||||||
return records.stream()
|
return ModeStrategyUtils.aggregateRecords(records, Field.NAMESPACE);
|
||||||
.collect(Collectors.groupingBy(r -> r.get(Field.NAMESPACE).asString()))
|
|
||||||
.entrySet().stream()
|
|
||||||
.flatMap(
|
|
||||||
e -> e.getValue().stream()
|
|
||||||
.reduce(Record::combine)
|
|
||||||
.map(Stream::of)
|
|
||||||
.orElse(Stream.empty()))
|
|
||||||
.collect(Collectors.toList());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -24,6 +24,8 @@ import java.util.Arrays;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
import org.apache.commons.lang3.time.FastDateFormat;
|
import org.apache.commons.lang3.time.FastDateFormat;
|
||||||
import org.apache.hadoop.hbase.ClusterMetrics;
|
import org.apache.hadoop.hbase.ClusterMetrics;
|
||||||
import org.apache.hadoop.hbase.RegionMetrics;
|
import org.apache.hadoop.hbase.RegionMetrics;
|
||||||
|
@ -31,6 +33,7 @@ import org.apache.hadoop.hbase.ServerMetrics;
|
||||||
import org.apache.hadoop.hbase.TableName;
|
import org.apache.hadoop.hbase.TableName;
|
||||||
import org.apache.hadoop.hbase.client.RegionInfo;
|
import org.apache.hadoop.hbase.client.RegionInfo;
|
||||||
import org.apache.hadoop.hbase.hbtop.Record;
|
import org.apache.hadoop.hbase.hbtop.Record;
|
||||||
|
import org.apache.hadoop.hbase.hbtop.RecordFilter;
|
||||||
import org.apache.hadoop.hbase.hbtop.field.Field;
|
import org.apache.hadoop.hbase.hbtop.field.Field;
|
||||||
import org.apache.hadoop.hbase.hbtop.field.FieldInfo;
|
import org.apache.hadoop.hbase.hbtop.field.FieldInfo;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
@ -83,8 +86,8 @@ public final class RegionModeStrategy implements ModeStrategy {
|
||||||
return Field.REQUEST_COUNT_PER_SECOND;
|
return Field.REQUEST_COUNT_PER_SECOND;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override public List<Record> getRecords(ClusterMetrics clusterMetrics,
|
||||||
public List<Record> getRecords(ClusterMetrics clusterMetrics) {
|
List<RecordFilter> pushDownFilters) {
|
||||||
List<Record> ret = new ArrayList<>();
|
List<Record> ret = new ArrayList<>();
|
||||||
for (ServerMetrics sm : clusterMetrics.getLiveServerMetrics().values()) {
|
for (ServerMetrics sm : clusterMetrics.getLiveServerMetrics().values()) {
|
||||||
long lastReportTimestamp = sm.getLastReportTimestamp();
|
long lastReportTimestamp = sm.getLastReportTimestamp();
|
||||||
|
@ -174,6 +177,27 @@ public final class RegionModeStrategy implements ModeStrategy {
|
||||||
return builder.build();
|
return builder.build();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Form new record list with records formed by only fields provided through fieldInfo and
|
||||||
|
* add a count field for each record with value 1
|
||||||
|
* We are doing two operation of selecting and adding new field
|
||||||
|
* because of saving some CPU cycles on rebuilding the record again
|
||||||
|
*
|
||||||
|
* @param fieldInfos List of FieldInfos required in the record
|
||||||
|
* @param records List of records which needs to be processed
|
||||||
|
* @param countField Field which needs to be added with value 1 for each record
|
||||||
|
* @return records after selecting required fields and adding count field
|
||||||
|
*/
|
||||||
|
List<Record> selectModeFieldsAndAddCountField(List<FieldInfo> fieldInfos, List<Record> records,
|
||||||
|
Field countField) {
|
||||||
|
|
||||||
|
return records.stream().map(record -> Record.ofEntries(
|
||||||
|
fieldInfos.stream().filter(fi -> record.containsKey(fi.getField()))
|
||||||
|
.map(fi -> Record.entry(fi.getField(), record.get(fi.getField())))))
|
||||||
|
.map(record -> Record.builder().putAll(record).put(countField, 1).build())
|
||||||
|
.collect(Collectors.toList());
|
||||||
|
}
|
||||||
|
|
||||||
@Nullable
|
@Nullable
|
||||||
@Override
|
@Override
|
||||||
public DrillDownInfo drillDown(Record selectedRecord) {
|
public DrillDownInfo drillDown(Record selectedRecord) {
|
||||||
|
|
|
@ -23,7 +23,7 @@ import java.util.Collections;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
import java.util.stream.Stream;
|
|
||||||
import org.apache.hadoop.hbase.ClusterMetrics;
|
import org.apache.hadoop.hbase.ClusterMetrics;
|
||||||
import org.apache.hadoop.hbase.ServerMetrics;
|
import org.apache.hadoop.hbase.ServerMetrics;
|
||||||
import org.apache.hadoop.hbase.hbtop.Record;
|
import org.apache.hadoop.hbase.hbtop.Record;
|
||||||
|
@ -70,27 +70,15 @@ public final class RegionServerModeStrategy implements ModeStrategy {
|
||||||
return Field.REQUEST_COUNT_PER_SECOND;
|
return Field.REQUEST_COUNT_PER_SECOND;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override public List<Record> getRecords(ClusterMetrics clusterMetrics,
|
||||||
public List<Record> getRecords(ClusterMetrics clusterMetrics) {
|
List<RecordFilter> pushDownFilters) {
|
||||||
// Get records from RegionModeStrategy and add REGION_COUNT field
|
// Get records from RegionModeStrategy and add REGION_COUNT field
|
||||||
List<Record> records = regionModeStrategy.getRecords(clusterMetrics).stream()
|
List<Record> records = regionModeStrategy.selectModeFieldsAndAddCountField(fieldInfos,
|
||||||
.map(record ->
|
regionModeStrategy.getRecords(clusterMetrics, pushDownFilters), Field.REGION_COUNT);
|
||||||
Record.ofEntries(fieldInfos.stream()
|
|
||||||
.filter(fi -> record.containsKey(fi.getField()))
|
|
||||||
.map(fi -> Record.entry(fi.getField(), record.get(fi.getField())))))
|
|
||||||
.map(record -> Record.builder().putAll(record).put(Field.REGION_COUNT, 1).build())
|
|
||||||
.collect(Collectors.toList());
|
|
||||||
|
|
||||||
// Aggregation by LONG_REGION_SERVER field
|
// Aggregation by LONG_REGION_SERVER field
|
||||||
Map<String, Record> retMap = records.stream()
|
Map<String, Record> retMap =
|
||||||
.collect(Collectors.groupingBy(r -> r.get(Field.LONG_REGION_SERVER).asString()))
|
ModeStrategyUtils.aggregateRecords(records, Field.LONG_REGION_SERVER).stream()
|
||||||
.entrySet().stream()
|
.collect(Collectors.toMap(r -> r.get(Field.LONG_REGION_SERVER).asString(), r -> r));
|
||||||
.flatMap(
|
|
||||||
e -> e.getValue().stream()
|
|
||||||
.reduce(Record::combine)
|
|
||||||
.map(Stream::of)
|
|
||||||
.orElse(Stream.empty()))
|
|
||||||
.collect(Collectors.toMap(r -> r.get(Field.LONG_REGION_SERVER).asString(), r -> r));
|
|
||||||
|
|
||||||
// Add USED_HEAP_SIZE field and MAX_HEAP_SIZE field
|
// Add USED_HEAP_SIZE field and MAX_HEAP_SIZE field
|
||||||
for (ServerMetrics sm : clusterMetrics.getLiveServerMetrics().values()) {
|
for (ServerMetrics sm : clusterMetrics.getLiveServerMetrics().values()) {
|
||||||
|
|
|
@ -65,16 +65,11 @@ public final class TableModeStrategy implements ModeStrategy {
|
||||||
return Field.REQUEST_COUNT_PER_SECOND;
|
return Field.REQUEST_COUNT_PER_SECOND;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override public List<Record> getRecords(ClusterMetrics clusterMetrics,
|
||||||
public List<Record> getRecords(ClusterMetrics clusterMetrics) {
|
List<RecordFilter> pushDownFilters) {
|
||||||
// Get records from RegionModeStrategy and add REGION_COUNT field
|
// Get records from RegionModeStrategy and add REGION_COUNT field
|
||||||
List<Record> records = regionModeStrategy.getRecords(clusterMetrics).stream()
|
List<Record> records = regionModeStrategy.selectModeFieldsAndAddCountField(fieldInfos,
|
||||||
.map(record ->
|
regionModeStrategy.getRecords(clusterMetrics, pushDownFilters), Field.REGION_COUNT);
|
||||||
Record.ofEntries(fieldInfos.stream()
|
|
||||||
.filter(fi -> record.containsKey(fi.getField()))
|
|
||||||
.map(fi -> Record.entry(fi.getField(), record.get(fi.getField())))))
|
|
||||||
.map(record -> Record.builder().putAll(record).put(Field.REGION_COUNT, 1).build())
|
|
||||||
.collect(Collectors.toList());
|
|
||||||
|
|
||||||
// Aggregation by NAMESPACE field and TABLE field
|
// Aggregation by NAMESPACE field and TABLE field
|
||||||
return records.stream()
|
return records.stream()
|
||||||
|
|
|
@ -0,0 +1,70 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.hbtop.mode;
|
||||||
|
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
import org.apache.hadoop.hbase.ClusterMetrics;
|
||||||
|
import org.apache.hadoop.hbase.hbtop.Record;
|
||||||
|
import org.apache.hadoop.hbase.hbtop.RecordFilter;
|
||||||
|
import org.apache.hadoop.hbase.hbtop.field.Field;
|
||||||
|
import org.apache.hadoop.hbase.hbtop.field.FieldInfo;
|
||||||
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Implementation for {@link ModeStrategy} for User Mode.
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Private public final class UserModeStrategy implements ModeStrategy {
|
||||||
|
|
||||||
|
private final List<FieldInfo> fieldInfos = Arrays
|
||||||
|
.asList(new FieldInfo(Field.USER, 0, true),
|
||||||
|
new FieldInfo(Field.CLIENT_COUNT, 7, true),
|
||||||
|
new FieldInfo(Field.REQUEST_COUNT_PER_SECOND, 10, true),
|
||||||
|
new FieldInfo(Field.READ_REQUEST_COUNT_PER_SECOND, 10, true),
|
||||||
|
new FieldInfo(Field.WRITE_REQUEST_COUNT_PER_SECOND, 10, true),
|
||||||
|
new FieldInfo(Field.FILTERED_READ_REQUEST_COUNT_PER_SECOND, 10, true));
|
||||||
|
private final ClientModeStrategy clientModeStrategy = new ClientModeStrategy();
|
||||||
|
|
||||||
|
UserModeStrategy() {
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override public List<FieldInfo> getFieldInfos() {
|
||||||
|
return fieldInfos;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override public Field getDefaultSortField() {
|
||||||
|
return Field.REQUEST_COUNT_PER_SECOND;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override public List<Record> getRecords(ClusterMetrics clusterMetrics,
|
||||||
|
List<RecordFilter> pushDownFilters) {
|
||||||
|
List<Record> records = clientModeStrategy.createRecords(clusterMetrics);
|
||||||
|
return clientModeStrategy.aggregateRecordsAndAddDistinct(
|
||||||
|
ModeStrategyUtils.applyFilterAndGet(records, pushDownFilters), Field.USER, Field.CLIENT,
|
||||||
|
Field.CLIENT_COUNT);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override public DrillDownInfo drillDown(Record selectedRecord) {
|
||||||
|
//Drill down to client and using selected USER as a filter
|
||||||
|
List<RecordFilter> initialFilters = Collections.singletonList(
|
||||||
|
RecordFilter.newBuilder(Field.USER).doubleEquals(selectedRecord.get(Field.USER)));
|
||||||
|
return new DrillDownInfo(Mode.CLIENT, initialFilters);
|
||||||
|
}
|
||||||
|
}
|
|
@ -17,12 +17,15 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hbase.hbtop.screen.top;
|
package org.apache.hadoop.hbase.hbtop.screen.top;
|
||||||
|
|
||||||
|
import static org.apache.commons.lang3.time.DateFormatUtils.ISO_8601_EXTENDED_TIME_FORMAT;
|
||||||
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
import java.util.Collection;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Objects;
|
import java.util.Objects;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
import org.apache.commons.lang3.time.DateFormatUtils;
|
|
||||||
import org.apache.hadoop.hbase.ClusterMetrics;
|
import org.apache.hadoop.hbase.ClusterMetrics;
|
||||||
import org.apache.hadoop.hbase.client.Admin;
|
import org.apache.hadoop.hbase.client.Admin;
|
||||||
import org.apache.hadoop.hbase.hbtop.Record;
|
import org.apache.hadoop.hbase.hbtop.Record;
|
||||||
|
@ -56,6 +59,7 @@ public class TopScreenModel {
|
||||||
private List<Record> records;
|
private List<Record> records;
|
||||||
|
|
||||||
private final List<RecordFilter> filters = new ArrayList<>();
|
private final List<RecordFilter> filters = new ArrayList<>();
|
||||||
|
private final List<RecordFilter> pushDownFilters = new ArrayList<>();
|
||||||
private final List<String> filterHistories = new ArrayList<>();
|
private final List<String> filterHistories = new ArrayList<>();
|
||||||
|
|
||||||
private boolean ascendingSort;
|
private boolean ascendingSort;
|
||||||
|
@ -88,6 +92,7 @@ public class TopScreenModel {
|
||||||
if (initialFilters != null) {
|
if (initialFilters != null) {
|
||||||
filters.addAll(initialFilters);
|
filters.addAll(initialFilters);
|
||||||
}
|
}
|
||||||
|
decomposePushDownFilter();
|
||||||
}
|
}
|
||||||
|
|
||||||
public void setSortFieldAndFields(Field sortField, List<Field> fields) {
|
public void setSortFieldAndFields(Field sortField, List<Field> fields) {
|
||||||
|
@ -113,7 +118,7 @@ public class TopScreenModel {
|
||||||
}
|
}
|
||||||
|
|
||||||
private void refreshSummary(ClusterMetrics clusterMetrics) {
|
private void refreshSummary(ClusterMetrics clusterMetrics) {
|
||||||
String currentTime = DateFormatUtils.ISO_8601_EXTENDED_TIME_FORMAT
|
String currentTime = ISO_8601_EXTENDED_TIME_FORMAT
|
||||||
.format(System.currentTimeMillis());
|
.format(System.currentTimeMillis());
|
||||||
String version = clusterMetrics.getHBaseVersion();
|
String version = clusterMetrics.getHBaseVersion();
|
||||||
String clusterId = clusterMetrics.getClusterId();
|
String clusterId = clusterMetrics.getClusterId();
|
||||||
|
@ -130,7 +135,7 @@ public class TopScreenModel {
|
||||||
}
|
}
|
||||||
|
|
||||||
private void refreshRecords(ClusterMetrics clusterMetrics) {
|
private void refreshRecords(ClusterMetrics clusterMetrics) {
|
||||||
List<Record> records = currentMode.getRecords(clusterMetrics);
|
List<Record> records = currentMode.getRecords(clusterMetrics, pushDownFilters);
|
||||||
|
|
||||||
// Filter and sort
|
// Filter and sort
|
||||||
records = records.stream()
|
records = records.stream()
|
||||||
|
@ -153,13 +158,13 @@ public class TopScreenModel {
|
||||||
if (filter == null) {
|
if (filter == null) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
filters.add(filter);
|
filters.add(filter);
|
||||||
filterHistories.add(filterString);
|
filterHistories.add(filterString);
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void clearFilters() {
|
public void clearFilters() {
|
||||||
|
pushDownFilters.clear();
|
||||||
filters.clear();
|
filters.clear();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -203,4 +208,18 @@ public class TopScreenModel {
|
||||||
public List<String> getFilterHistories() {
|
public List<String> getFilterHistories() {
|
||||||
return Collections.unmodifiableList(filterHistories);
|
return Collections.unmodifiableList(filterHistories);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void decomposePushDownFilter() {
|
||||||
|
pushDownFilters.clear();
|
||||||
|
for (RecordFilter filter : filters) {
|
||||||
|
if (!fields.contains(filter.getField())) {
|
||||||
|
pushDownFilters.add(filter);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
filters.removeAll(pushDownFilters);
|
||||||
|
}
|
||||||
|
|
||||||
|
public Collection<? extends RecordFilter> getPushDownFilters() {
|
||||||
|
return pushDownFilters;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -25,6 +25,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
import java.util.concurrent.atomic.AtomicLong;
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
import org.apache.hadoop.hbase.hbtop.Record;
|
import org.apache.hadoop.hbase.hbtop.Record;
|
||||||
|
import org.apache.hadoop.hbase.hbtop.RecordFilter;
|
||||||
import org.apache.hadoop.hbase.hbtop.field.Field;
|
import org.apache.hadoop.hbase.hbtop.field.Field;
|
||||||
import org.apache.hadoop.hbase.hbtop.field.FieldInfo;
|
import org.apache.hadoop.hbase.hbtop.field.FieldInfo;
|
||||||
import org.apache.hadoop.hbase.hbtop.mode.Mode;
|
import org.apache.hadoop.hbase.hbtop.mode.Mode;
|
||||||
|
@ -324,7 +325,8 @@ public class TopScreenPresenter {
|
||||||
}
|
}
|
||||||
|
|
||||||
public ScreenView goToFilterDisplayMode(Screen screen, Terminal terminal, int row) {
|
public ScreenView goToFilterDisplayMode(Screen screen, Terminal terminal, int row) {
|
||||||
return new FilterDisplayModeScreenView(screen, terminal, row, topScreenModel.getFilters(),
|
ArrayList<RecordFilter> filters = new ArrayList<>(topScreenModel.getFilters());
|
||||||
topScreenView);
|
filters.addAll(topScreenModel.getPushDownFilters());
|
||||||
|
return new FilterDisplayModeScreenView(screen, terminal, row, filters, topScreenView);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -27,6 +27,7 @@ import java.util.Collections;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
import org.apache.commons.lang3.time.FastDateFormat;
|
import org.apache.commons.lang3.time.FastDateFormat;
|
||||||
import org.apache.hadoop.hbase.ClusterMetrics;
|
import org.apache.hadoop.hbase.ClusterMetrics;
|
||||||
import org.apache.hadoop.hbase.ClusterMetricsBuilder;
|
import org.apache.hadoop.hbase.ClusterMetricsBuilder;
|
||||||
|
@ -37,6 +38,8 @@ import org.apache.hadoop.hbase.ServerMetricsBuilder;
|
||||||
import org.apache.hadoop.hbase.ServerName;
|
import org.apache.hadoop.hbase.ServerName;
|
||||||
import org.apache.hadoop.hbase.Size;
|
import org.apache.hadoop.hbase.Size;
|
||||||
import org.apache.hadoop.hbase.TableName;
|
import org.apache.hadoop.hbase.TableName;
|
||||||
|
import org.apache.hadoop.hbase.UserMetrics;
|
||||||
|
import org.apache.hadoop.hbase.UserMetricsBuilder;
|
||||||
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
|
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
|
||||||
import org.apache.hadoop.hbase.hbtop.field.Field;
|
import org.apache.hadoop.hbase.hbtop.field.Field;
|
||||||
import org.apache.hadoop.hbase.hbtop.screen.top.Summary;
|
import org.apache.hadoop.hbase.hbtop.screen.top.Summary;
|
||||||
|
@ -54,6 +57,9 @@ public final class TestUtils {
|
||||||
|
|
||||||
// host1
|
// host1
|
||||||
List<RegionMetrics> regionMetricsList = new ArrayList<>();
|
List<RegionMetrics> regionMetricsList = new ArrayList<>();
|
||||||
|
List<UserMetrics> userMetricsList = new ArrayList<>();
|
||||||
|
userMetricsList.add(createUserMetrics("FOO",1,2, 4));
|
||||||
|
userMetricsList.add(createUserMetrics("BAR",2,3, 3));
|
||||||
regionMetricsList.add(createRegionMetrics(
|
regionMetricsList.add(createRegionMetrics(
|
||||||
"table1,,1.00000000000000000000000000000000.",
|
"table1,,1.00000000000000000000000000000000.",
|
||||||
100, 50, 100,
|
100, 50, 100,
|
||||||
|
@ -73,10 +79,13 @@ public final class TestUtils {
|
||||||
ServerName host1 = ServerName.valueOf("host1.apache.com", 1000, 1);
|
ServerName host1 = ServerName.valueOf("host1.apache.com", 1000, 1);
|
||||||
serverMetricsMap.put(host1, createServerMetrics(host1, 100,
|
serverMetricsMap.put(host1, createServerMetrics(host1, 100,
|
||||||
new Size(100, Size.Unit.MEGABYTE), new Size(200, Size.Unit.MEGABYTE), 100,
|
new Size(100, Size.Unit.MEGABYTE), new Size(200, Size.Unit.MEGABYTE), 100,
|
||||||
regionMetricsList));
|
regionMetricsList, userMetricsList));
|
||||||
|
|
||||||
// host2
|
// host2
|
||||||
regionMetricsList.clear();
|
regionMetricsList.clear();
|
||||||
|
userMetricsList.clear();
|
||||||
|
userMetricsList.add(createUserMetrics("FOO",5,7, 3));
|
||||||
|
userMetricsList.add(createUserMetrics("BAR",4,8, 4));
|
||||||
regionMetricsList.add(createRegionMetrics(
|
regionMetricsList.add(createRegionMetrics(
|
||||||
"table1,1,4.00000000000000000000000000000003.",
|
"table1,1,4.00000000000000000000000000000003.",
|
||||||
100, 50, 100,
|
100, 50, 100,
|
||||||
|
@ -96,7 +105,7 @@ public final class TestUtils {
|
||||||
ServerName host2 = ServerName.valueOf("host2.apache.com", 1001, 2);
|
ServerName host2 = ServerName.valueOf("host2.apache.com", 1001, 2);
|
||||||
serverMetricsMap.put(host2, createServerMetrics(host2, 200,
|
serverMetricsMap.put(host2, createServerMetrics(host2, 200,
|
||||||
new Size(16, Size.Unit.GIGABYTE), new Size(32, Size.Unit.GIGABYTE), 200,
|
new Size(16, Size.Unit.GIGABYTE), new Size(32, Size.Unit.GIGABYTE), 200,
|
||||||
regionMetricsList));
|
regionMetricsList, userMetricsList));
|
||||||
|
|
||||||
ServerName host3 = ServerName.valueOf("host3.apache.com", 1002, 3);
|
ServerName host3 = ServerName.valueOf("host3.apache.com", 1002, 3);
|
||||||
return ClusterMetricsBuilder.newBuilder()
|
return ClusterMetricsBuilder.newBuilder()
|
||||||
|
@ -117,6 +126,15 @@ public final class TestUtils {
|
||||||
.build();
|
.build();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static UserMetrics createUserMetrics(String user, long readRequestCount,
|
||||||
|
long writeRequestCount, long filteredReadRequestsCount) {
|
||||||
|
return UserMetricsBuilder.newBuilder(Bytes.toBytes(user)).addClientMetris(
|
||||||
|
new UserMetricsBuilder.ClientMetricsImpl("CLIENT_A_" + user, readRequestCount,
|
||||||
|
writeRequestCount, filteredReadRequestsCount)).addClientMetris(
|
||||||
|
new UserMetricsBuilder.ClientMetricsImpl("CLIENT_B_" + user, readRequestCount,
|
||||||
|
writeRequestCount, filteredReadRequestsCount)).build();
|
||||||
|
}
|
||||||
|
|
||||||
private static RegionMetrics createRegionMetrics(String regionName, long readRequestCount,
|
private static RegionMetrics createRegionMetrics(String regionName, long readRequestCount,
|
||||||
long filteredReadRequestCount, long writeRequestCount, Size storeFileSize,
|
long filteredReadRequestCount, long writeRequestCount, Size storeFileSize,
|
||||||
Size uncompressedStoreFileSize, int storeFileCount, Size memStoreSize, float locality,
|
Size uncompressedStoreFileSize, int storeFileCount, Size memStoreSize, float locality,
|
||||||
|
@ -139,14 +157,15 @@ public final class TestUtils {
|
||||||
|
|
||||||
private static ServerMetrics createServerMetrics(ServerName serverName, long reportTimestamp,
|
private static ServerMetrics createServerMetrics(ServerName serverName, long reportTimestamp,
|
||||||
Size usedHeapSize, Size maxHeapSize, long requestCountPerSecond,
|
Size usedHeapSize, Size maxHeapSize, long requestCountPerSecond,
|
||||||
List<RegionMetrics> regionMetricsList) {
|
List<RegionMetrics> regionMetricsList, List<UserMetrics> userMetricsList) {
|
||||||
|
|
||||||
return ServerMetricsBuilder.newBuilder(serverName)
|
return ServerMetricsBuilder.newBuilder(serverName)
|
||||||
.setReportTimestamp(reportTimestamp)
|
.setReportTimestamp(reportTimestamp)
|
||||||
.setUsedHeapSize(usedHeapSize)
|
.setUsedHeapSize(usedHeapSize)
|
||||||
.setMaxHeapSize(maxHeapSize)
|
.setMaxHeapSize(maxHeapSize)
|
||||||
.setRequestCountPerSecond(requestCountPerSecond)
|
.setRequestCountPerSecond(requestCountPerSecond)
|
||||||
.setRegionMetrics(regionMetricsList).build();
|
.setRegionMetrics(regionMetricsList)
|
||||||
|
.setUserMetrics(userMetricsList).build();
|
||||||
}
|
}
|
||||||
|
|
||||||
public static void assertRecordsInRegionMode(List<Record> records) {
|
public static void assertRecordsInRegionMode(List<Record> records) {
|
||||||
|
@ -316,10 +335,78 @@ public final class TestUtils {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static void assertRecordsInUserMode(List<Record> records) {
|
||||||
|
assertThat(records.size(), is(2));
|
||||||
|
for (Record record : records) {
|
||||||
|
String user = record.get(Field.USER).asString();
|
||||||
|
switch (user) {
|
||||||
|
//readRequestPerSecond and writeRequestPerSecond will be zero
|
||||||
|
// because there is no change or new metrics during refresh
|
||||||
|
case "FOO":
|
||||||
|
assertRecordInUserMode(record, 0L, 0L, 0L);
|
||||||
|
break;
|
||||||
|
case "BAR":
|
||||||
|
assertRecordInUserMode(record, 0L, 0L, 0L);
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
fail();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public static void assertRecordsInClientMode(List<Record> records) {
|
||||||
|
assertThat(records.size(), is(4));
|
||||||
|
for (Record record : records) {
|
||||||
|
String client = record.get(Field.CLIENT).asString();
|
||||||
|
switch (client) {
|
||||||
|
//readRequestPerSecond and writeRequestPerSecond will be zero
|
||||||
|
// because there is no change or new metrics during refresh
|
||||||
|
case "CLIENT_A_FOO":
|
||||||
|
assertRecordInClientMode(record, 0L, 0L, 0L);
|
||||||
|
break;
|
||||||
|
case "CLIENT_A_BAR":
|
||||||
|
assertRecordInClientMode(record, 0L, 0L, 0L);
|
||||||
|
break;
|
||||||
|
case "CLIENT_B_FOO":
|
||||||
|
assertRecordInClientMode(record, 0L, 0L, 0L);
|
||||||
|
break;
|
||||||
|
case "CLIENT_B_BAR":
|
||||||
|
assertRecordInClientMode(record, 0L, 0L, 0L);
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
fail();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static void assertRecordInUserMode(Record record, long readRequestCountPerSecond,
|
||||||
|
long writeCountRequestPerSecond, long filteredReadRequestsCount) {
|
||||||
|
assertThat(record.size(), is(6));
|
||||||
|
assertThat(record.get(Field.READ_REQUEST_COUNT_PER_SECOND).asLong(),
|
||||||
|
is(readRequestCountPerSecond));
|
||||||
|
assertThat(record.get(Field.WRITE_REQUEST_COUNT_PER_SECOND).asLong(),
|
||||||
|
is(writeCountRequestPerSecond));
|
||||||
|
assertThat(record.get(Field.FILTERED_READ_REQUEST_COUNT_PER_SECOND).asLong(),
|
||||||
|
is(filteredReadRequestsCount));
|
||||||
|
assertThat(record.get(Field.CLIENT_COUNT).asInt(), is(2));
|
||||||
|
}
|
||||||
|
|
||||||
|
private static void assertRecordInClientMode(Record record, long readRequestCountPerSecond,
|
||||||
|
long writeCountRequestPerSecond, long filteredReadRequestsCount) {
|
||||||
|
assertThat(record.size(), is(6));
|
||||||
|
assertThat(record.get(Field.READ_REQUEST_COUNT_PER_SECOND).asLong(),
|
||||||
|
is(readRequestCountPerSecond));
|
||||||
|
assertThat(record.get(Field.WRITE_REQUEST_COUNT_PER_SECOND).asLong(),
|
||||||
|
is(writeCountRequestPerSecond));
|
||||||
|
assertThat(record.get(Field.FILTERED_READ_REQUEST_COUNT_PER_SECOND).asLong(),
|
||||||
|
is(filteredReadRequestsCount));
|
||||||
|
assertThat(record.get(Field.USER_COUNT).asInt(), is(1));
|
||||||
|
}
|
||||||
|
|
||||||
private static void assertRecordInTableMode(Record record, long requestCountPerSecond,
|
private static void assertRecordInTableMode(Record record, long requestCountPerSecond,
|
||||||
long readRequestCountPerSecond, long filteredReadRequestCountPerSecond,
|
long readRequestCountPerSecond, long filteredReadRequestCountPerSecond,
|
||||||
long writeCountRequestPerSecond, Size storeFileSize, Size uncompressedStoreFileSize,
|
long writeCountRequestPerSecond, Size storeFileSize, Size uncompressedStoreFileSize,
|
||||||
int numStoreFiles, Size memStoreSize, int regionCount) {
|
int numStoreFiles, Size memStoreSize, int regionCount) {
|
||||||
assertThat(record.size(), is(11));
|
assertThat(record.size(), is(11));
|
||||||
assertThat(record.get(Field.REQUEST_COUNT_PER_SECOND).asLong(),
|
assertThat(record.get(Field.REQUEST_COUNT_PER_SECOND).asLong(),
|
||||||
is(requestCountPerSecond));
|
is(requestCountPerSecond));
|
||||||
|
|
|
@ -0,0 +1,72 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.hbtop.mode;
|
||||||
|
|
||||||
|
import static org.hamcrest.CoreMatchers.is;
|
||||||
|
import static org.junit.Assert.assertThat;
|
||||||
|
import static org.junit.Assert.fail;
|
||||||
|
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||||
|
import org.apache.hadoop.hbase.hbtop.Record;
|
||||||
|
import org.apache.hadoop.hbase.hbtop.TestUtils;
|
||||||
|
import org.apache.hadoop.hbase.hbtop.field.Field;
|
||||||
|
import org.apache.hadoop.hbase.testclassification.SmallTests;
|
||||||
|
import org.junit.ClassRule;
|
||||||
|
import org.junit.experimental.categories.Category;
|
||||||
|
|
||||||
|
@Category(SmallTests.class)
|
||||||
|
public class ClientModeTest extends ModeTestBase {
|
||||||
|
|
||||||
|
@ClassRule public static final HBaseClassTestRule CLASS_RULE =
|
||||||
|
HBaseClassTestRule.forClass(ClientModeTest.class);
|
||||||
|
|
||||||
|
@Override protected Mode getMode() {
|
||||||
|
return Mode.CLIENT;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override protected void assertRecords(List<Record> records) {
|
||||||
|
TestUtils.assertRecordsInClientMode(records);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override protected void assertDrillDown(Record currentRecord, DrillDownInfo drillDownInfo) {
|
||||||
|
assertThat(drillDownInfo.getNextMode(), is(Mode.USER));
|
||||||
|
assertThat(drillDownInfo.getInitialFilters().size(), is(1));
|
||||||
|
String client = currentRecord.get(Field.CLIENT).asString();
|
||||||
|
switch (client) {
|
||||||
|
case "CLIENT_A_FOO":
|
||||||
|
assertThat(drillDownInfo.getInitialFilters().get(0).toString(), is("CLIENT==CLIENT_A_FOO"));
|
||||||
|
break;
|
||||||
|
|
||||||
|
case "CLIENT_B_FOO":
|
||||||
|
assertThat(drillDownInfo.getInitialFilters().get(0).toString(), is("CLIENT==CLIENT_B_FOO"));
|
||||||
|
break;
|
||||||
|
|
||||||
|
case "CLIENT_A_BAR":
|
||||||
|
assertThat(drillDownInfo.getInitialFilters().get(0).toString(), is("CLIENT==CLIENT_A_BAR"));
|
||||||
|
break;
|
||||||
|
case "CLIENT_B_BAR":
|
||||||
|
assertThat(drillDownInfo.getInitialFilters().get(0).toString(), is("CLIENT==CLIENT_B_BAR"));
|
||||||
|
break;
|
||||||
|
|
||||||
|
default:
|
||||||
|
fail();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -27,7 +27,8 @@ public abstract class ModeTestBase {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testGetRecords() {
|
public void testGetRecords() {
|
||||||
List<Record> records = getMode().getRecords(TestUtils.createDummyClusterMetrics());
|
List<Record> records = getMode().getRecords(TestUtils.createDummyClusterMetrics(),
|
||||||
|
null);
|
||||||
assertRecords(records);
|
assertRecords(records);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -36,7 +37,8 @@ public abstract class ModeTestBase {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testDrillDown() {
|
public void testDrillDown() {
|
||||||
List<Record> records = getMode().getRecords(TestUtils.createDummyClusterMetrics());
|
List<Record> records = getMode().getRecords(TestUtils.createDummyClusterMetrics(),
|
||||||
|
null);
|
||||||
for (Record record : records) {
|
for (Record record : records) {
|
||||||
assertDrillDown(record, getMode().drillDown(record));
|
assertDrillDown(record, getMode().drillDown(record));
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,70 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.hbtop.mode;
|
||||||
|
|
||||||
|
import static org.hamcrest.CoreMatchers.is;
|
||||||
|
import static org.junit.Assert.assertThat;
|
||||||
|
import static org.junit.Assert.fail;
|
||||||
|
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||||
|
import org.apache.hadoop.hbase.hbtop.Record;
|
||||||
|
import org.apache.hadoop.hbase.hbtop.TestUtils;
|
||||||
|
import org.apache.hadoop.hbase.hbtop.field.Field;
|
||||||
|
import org.apache.hadoop.hbase.testclassification.SmallTests;
|
||||||
|
import org.junit.ClassRule;
|
||||||
|
import org.junit.experimental.categories.Category;
|
||||||
|
|
||||||
|
@Category(SmallTests.class)
|
||||||
|
public class UserModeTest extends ModeTestBase {
|
||||||
|
|
||||||
|
@ClassRule
|
||||||
|
public static final HBaseClassTestRule CLASS_RULE =
|
||||||
|
HBaseClassTestRule.forClass(UserModeTest.class);
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected Mode getMode() {
|
||||||
|
return Mode.USER;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void assertRecords(List<Record> records) {
|
||||||
|
TestUtils.assertRecordsInUserMode(records);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void assertDrillDown(Record currentRecord, DrillDownInfo drillDownInfo) {
|
||||||
|
assertThat(drillDownInfo.getNextMode(), is(Mode.CLIENT));
|
||||||
|
assertThat(drillDownInfo.getInitialFilters().size(), is(1));
|
||||||
|
String userName = currentRecord.get(Field.USER).asString();
|
||||||
|
|
||||||
|
switch (userName) {
|
||||||
|
case "FOO":
|
||||||
|
assertThat(drillDownInfo.getInitialFilters().get(0).toString(), is("USER==FOO"));
|
||||||
|
break;
|
||||||
|
|
||||||
|
case "BAR":
|
||||||
|
assertThat(drillDownInfo.getInitialFilters().get(0).toString(), is("USER==BAR"));
|
||||||
|
break;
|
||||||
|
|
||||||
|
default:
|
||||||
|
fail();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -157,6 +157,29 @@ message RegionLoad {
|
||||||
optional int32 max_store_file_ref_count = 22 [default = 0];
|
optional int32 max_store_file_ref_count = 22 [default = 0];
|
||||||
}
|
}
|
||||||
|
|
||||||
|
message UserLoad {
|
||||||
|
|
||||||
|
/** short user name */
|
||||||
|
required string userName = 1;
|
||||||
|
|
||||||
|
/** Metrics for all clients of a user */
|
||||||
|
repeated ClientMetrics clientMetrics = 2;
|
||||||
|
}
|
||||||
|
|
||||||
|
message ClientMetrics {
|
||||||
|
/** client host name */
|
||||||
|
required string hostName = 1;
|
||||||
|
|
||||||
|
/** the current total read requests made from a client */
|
||||||
|
optional uint64 read_requests_count = 2;
|
||||||
|
|
||||||
|
/** the current total write requests made from a client */
|
||||||
|
optional uint64 write_requests_count = 3;
|
||||||
|
|
||||||
|
/** the current total filtered requests made from a client */
|
||||||
|
optional uint64 filtered_requests_count = 4;
|
||||||
|
}
|
||||||
|
|
||||||
/* Server-level protobufs */
|
/* Server-level protobufs */
|
||||||
|
|
||||||
message ReplicationLoadSink {
|
message ReplicationLoadSink {
|
||||||
|
@ -230,6 +253,11 @@ message ServerLoad {
|
||||||
* The replicationLoadSink for the replication Sink status of this region server.
|
* The replicationLoadSink for the replication Sink status of this region server.
|
||||||
*/
|
*/
|
||||||
optional ReplicationLoadSink replLoadSink = 11;
|
optional ReplicationLoadSink replLoadSink = 11;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The metrics for each user on this region server
|
||||||
|
*/
|
||||||
|
repeated UserLoad userLoads = 12;
|
||||||
}
|
}
|
||||||
|
|
||||||
message LiveServerInfo {
|
message LiveServerInfo {
|
||||||
|
|
|
@ -153,6 +153,32 @@ message RegionLoad {
|
||||||
optional int32 max_store_file_ref_count = 22 [default = 0];
|
optional int32 max_store_file_ref_count = 22 [default = 0];
|
||||||
}
|
}
|
||||||
|
|
||||||
|
message UserLoad {
|
||||||
|
|
||||||
|
/** short user name */
|
||||||
|
required string userName = 1;
|
||||||
|
|
||||||
|
/** Metrics for all clients of a user */
|
||||||
|
repeated ClientMetrics clientMetrics = 2;
|
||||||
|
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
message ClientMetrics {
|
||||||
|
/** client host name */
|
||||||
|
required string hostName = 1;
|
||||||
|
|
||||||
|
/** the current total read requests made from a client */
|
||||||
|
optional uint64 read_requests_count = 2;
|
||||||
|
|
||||||
|
/** the current total write requests made from a client */
|
||||||
|
optional uint64 write_requests_count = 3;
|
||||||
|
|
||||||
|
/** the current total filtered requests made from a client */
|
||||||
|
optional uint64 filtered_requests_count = 4;
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
/* Server-level protobufs */
|
/* Server-level protobufs */
|
||||||
|
|
||||||
message ReplicationLoadSink {
|
message ReplicationLoadSink {
|
||||||
|
@ -219,6 +245,11 @@ message ServerLoad {
|
||||||
* The replicationLoadSink for the replication Sink status of this region server.
|
* The replicationLoadSink for the replication Sink status of this region server.
|
||||||
*/
|
*/
|
||||||
optional ReplicationLoadSink replLoadSink = 11;
|
optional ReplicationLoadSink replLoadSink = 11;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The metrics for each user on this region server
|
||||||
|
*/
|
||||||
|
repeated UserLoad userLoads = 12;
|
||||||
}
|
}
|
||||||
|
|
||||||
message LiveServerInfo {
|
message LiveServerInfo {
|
||||||
|
|
|
@ -843,7 +843,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
||||||
// TODO: revisit if coprocessors should load in other cases
|
// TODO: revisit if coprocessors should load in other cases
|
||||||
this.coprocessorHost = new RegionCoprocessorHost(this, rsServices, conf);
|
this.coprocessorHost = new RegionCoprocessorHost(this, rsServices, conf);
|
||||||
this.metricsRegionWrapper = new MetricsRegionWrapperImpl(this);
|
this.metricsRegionWrapper = new MetricsRegionWrapperImpl(this);
|
||||||
this.metricsRegion = new MetricsRegion(this.metricsRegionWrapper);
|
this.metricsRegion = new MetricsRegion(this.metricsRegionWrapper, conf);
|
||||||
} else {
|
} else {
|
||||||
this.metricsRegionWrapper = null;
|
this.metricsRegionWrapper = null;
|
||||||
this.metricsRegion = null;
|
this.metricsRegion = null;
|
||||||
|
@ -6582,6 +6582,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
||||||
|
|
||||||
if (!outResults.isEmpty()) {
|
if (!outResults.isEmpty()) {
|
||||||
readRequestsCount.increment();
|
readRequestsCount.increment();
|
||||||
|
if (metricsRegion != null) {
|
||||||
|
metricsRegion.updateReadRequestCount();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
if (rsServices != null && rsServices.getMetrics() != null) {
|
if (rsServices != null && rsServices.getMetrics() != null) {
|
||||||
rsServices.getMetrics().updateReadQueryMeter(getRegionInfo().getTable());
|
rsServices.getMetrics().updateReadQueryMeter(getRegionInfo().getTable());
|
||||||
|
@ -6909,6 +6912,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
||||||
|
|
||||||
protected void incrementCountOfRowsFilteredMetric(ScannerContext scannerContext) {
|
protected void incrementCountOfRowsFilteredMetric(ScannerContext scannerContext) {
|
||||||
filteredReadRequestsCount.increment();
|
filteredReadRequestsCount.increment();
|
||||||
|
if (metricsRegion != null) {
|
||||||
|
metricsRegion.updateFilteredRecords();
|
||||||
|
}
|
||||||
|
|
||||||
if (scannerContext == null || !scannerContext.isTrackingMetrics()) return;
|
if (scannerContext == null || !scannerContext.isTrackingMetrics()) return;
|
||||||
|
|
||||||
|
|
|
@ -56,6 +56,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||||
import java.util.function.Function;
|
import java.util.function.Function;
|
||||||
import javax.management.MalformedObjectNameException;
|
import javax.management.MalformedObjectNameException;
|
||||||
import javax.servlet.http.HttpServlet;
|
import javax.servlet.http.HttpServlet;
|
||||||
|
|
||||||
import org.apache.commons.lang3.RandomUtils;
|
import org.apache.commons.lang3.RandomUtils;
|
||||||
import org.apache.commons.lang3.StringUtils;
|
import org.apache.commons.lang3.StringUtils;
|
||||||
import org.apache.commons.lang3.SystemUtils;
|
import org.apache.commons.lang3.SystemUtils;
|
||||||
|
@ -208,6 +209,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.Coprocesso
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.RegionLoad;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.RegionLoad;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.RegionStoreSequenceIds;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.RegionStoreSequenceIds;
|
||||||
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.UserLoad;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.Coprocessor;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.Coprocessor;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.Coprocessor.Builder;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.Coprocessor.Builder;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.NameStringPair;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.NameStringPair;
|
||||||
|
@ -1379,7 +1381,14 @@ public class HRegionServer extends HasThread implements
|
||||||
} else {
|
} else {
|
||||||
serverLoad.setInfoServerPort(-1);
|
serverLoad.setInfoServerPort(-1);
|
||||||
}
|
}
|
||||||
|
MetricsUserAggregateSource userSource =
|
||||||
|
metricsRegionServer.getMetricsUserAggregate().getSource();
|
||||||
|
if (userSource != null) {
|
||||||
|
Map<String, MetricsUserSource> userMetricMap = userSource.getUserSources();
|
||||||
|
for (Entry<String, MetricsUserSource> entry : userMetricMap.entrySet()) {
|
||||||
|
serverLoad.addUserLoads(createUserLoad(entry.getKey(), entry.getValue()));
|
||||||
|
}
|
||||||
|
}
|
||||||
// for the replicationLoad purpose. Only need to get from one executorService
|
// for the replicationLoad purpose. Only need to get from one executorService
|
||||||
// either source or sink will get the same info
|
// either source or sink will get the same info
|
||||||
ReplicationSourceService rsources = getReplicationSourceService();
|
ReplicationSourceService rsources = getReplicationSourceService();
|
||||||
|
@ -1717,6 +1726,19 @@ public class HRegionServer extends HasThread implements
|
||||||
return regionLoadBldr.build();
|
return regionLoadBldr.build();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private UserLoad createUserLoad(String user, MetricsUserSource userSource) {
|
||||||
|
UserLoad.Builder userLoadBldr = UserLoad.newBuilder();
|
||||||
|
userLoadBldr.setUserName(user);
|
||||||
|
userSource.getClientMetrics().values().stream().map(
|
||||||
|
clientMetrics -> ClusterStatusProtos.ClientMetrics.newBuilder()
|
||||||
|
.setHostName(clientMetrics.getHostName())
|
||||||
|
.setWriteRequestsCount(clientMetrics.getWriteRequestsCount())
|
||||||
|
.setFilteredRequestsCount(clientMetrics.getFilteredReadRequests())
|
||||||
|
.setReadRequestsCount(clientMetrics.getReadRequestsCount()).build())
|
||||||
|
.forEach(userLoadBldr::addClientMetrics);
|
||||||
|
return userLoadBldr.build();
|
||||||
|
}
|
||||||
|
|
||||||
public RegionLoad createRegionLoad(final String encodedRegionName) throws IOException {
|
public RegionLoad createRegionLoad(final String encodedRegionName) throws IOException {
|
||||||
HRegion r = onlineRegions.get(encodedRegionName);
|
HRegion r = onlineRegions.get(encodedRegionName);
|
||||||
return r != null ? createRegionLoad(r, null, null) : null;
|
return r != null ? createRegionLoad(r, null, null) : null;
|
||||||
|
|
|
@ -18,6 +18,7 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hbase.regionserver;
|
package org.apache.hadoop.hbase.regionserver;
|
||||||
|
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.yetus.audience.InterfaceAudience;
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
|
import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
|
||||||
|
|
||||||
|
@ -29,12 +30,14 @@ import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
public class MetricsRegion {
|
public class MetricsRegion {
|
||||||
private final MetricsRegionSource source;
|
private final MetricsRegionSource source;
|
||||||
|
private final MetricsUserAggregate userAggregate;
|
||||||
private MetricsRegionWrapper regionWrapper;
|
private MetricsRegionWrapper regionWrapper;
|
||||||
|
|
||||||
public MetricsRegion(final MetricsRegionWrapper wrapper) {
|
public MetricsRegion(final MetricsRegionWrapper wrapper, Configuration conf) {
|
||||||
source = CompatibilitySingletonFactory.getInstance(MetricsRegionServerSourceFactory.class)
|
source = CompatibilitySingletonFactory.getInstance(MetricsRegionServerSourceFactory.class)
|
||||||
.createRegion(wrapper);
|
.createRegion(wrapper);
|
||||||
this.regionWrapper = wrapper;
|
this.regionWrapper = wrapper;
|
||||||
|
userAggregate = MetricsUserAggregateFactory.getMetricsUserAggregate(conf);
|
||||||
}
|
}
|
||||||
|
|
||||||
public void close() {
|
public void close() {
|
||||||
|
@ -57,6 +60,9 @@ public class MetricsRegion {
|
||||||
source.updateScanTime(t);
|
source.updateScanTime(t);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void updateFilteredRecords(){
|
||||||
|
userAggregate.updateFilteredReadRequests();
|
||||||
|
}
|
||||||
public void updateAppend() {
|
public void updateAppend() {
|
||||||
source.updateAppend();
|
source.updateAppend();
|
||||||
}
|
}
|
||||||
|
@ -73,4 +79,7 @@ public class MetricsRegion {
|
||||||
return regionWrapper;
|
return regionWrapper;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void updateReadRequestCount() {
|
||||||
|
userAggregate.updateReadRequestCount();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -101,7 +101,7 @@ public class MetricsRegionServer {
|
||||||
}
|
}
|
||||||
|
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
public org.apache.hadoop.hbase.regionserver.MetricsUserAggregate getMetricsUserAggregate() {
|
public MetricsUserAggregate getMetricsUserAggregate() {
|
||||||
return userAggregate;
|
return userAggregate;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -23,6 +23,11 @@ import org.apache.yetus.audience.InterfaceAudience;
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
public interface MetricsUserAggregate {
|
public interface MetricsUserAggregate {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return return a singleton instance of MetricsUserAggregateSource or null in case of NoOp
|
||||||
|
*/
|
||||||
|
MetricsUserAggregateSource getSource();
|
||||||
|
|
||||||
void updatePut(long t);
|
void updatePut(long t);
|
||||||
|
|
||||||
void updateDelete(long t);
|
void updateDelete(long t);
|
||||||
|
@ -36,4 +41,8 @@ public interface MetricsUserAggregate {
|
||||||
void updateReplay(long t);
|
void updateReplay(long t);
|
||||||
|
|
||||||
void updateScanTime(long t);
|
void updateScanTime(long t);
|
||||||
|
|
||||||
|
void updateFilteredReadRequests();
|
||||||
|
|
||||||
|
void updateReadRequestCount();
|
||||||
}
|
}
|
||||||
|
|
|
@ -18,7 +18,6 @@
|
||||||
|
|
||||||
package org.apache.hadoop.hbase.regionserver;
|
package org.apache.hadoop.hbase.regionserver;
|
||||||
|
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.yetus.audience.InterfaceAudience;
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
|
|
||||||
|
@ -27,6 +26,7 @@ public class MetricsUserAggregateFactory {
|
||||||
private MetricsUserAggregateFactory() {
|
private MetricsUserAggregateFactory() {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public static final String METRIC_USER_ENABLED_CONF = "hbase.regionserver.user.metrics.enabled";
|
public static final String METRIC_USER_ENABLED_CONF = "hbase.regionserver.user.metrics.enabled";
|
||||||
public static final boolean DEFAULT_METRIC_USER_ENABLED_CONF = true;
|
public static final boolean DEFAULT_METRIC_USER_ENABLED_CONF = true;
|
||||||
|
|
||||||
|
@ -36,6 +36,10 @@ public class MetricsUserAggregateFactory {
|
||||||
} else {
|
} else {
|
||||||
//NoOpMetricUserAggregate
|
//NoOpMetricUserAggregate
|
||||||
return new MetricsUserAggregate() {
|
return new MetricsUserAggregate() {
|
||||||
|
@Override public MetricsUserAggregateSource getSource() {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
@Override public void updatePut(long t) {
|
@Override public void updatePut(long t) {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -63,6 +67,14 @@ public class MetricsUserAggregateFactory {
|
||||||
@Override public void updateScanTime(long t) {
|
@Override public void updateScanTime(long t) {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override public void updateFilteredReadRequests() {
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override public void updateReadRequestCount() {
|
||||||
|
|
||||||
|
}
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -19,6 +19,7 @@
|
||||||
package org.apache.hadoop.hbase.regionserver;
|
package org.apache.hadoop.hbase.regionserver;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.net.InetAddress;
|
||||||
import java.util.Optional;
|
import java.util.Optional;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
@ -29,8 +30,6 @@ import org.apache.hadoop.hbase.security.UserProvider;
|
||||||
import org.apache.hadoop.hbase.util.LossyCounting;
|
import org.apache.hadoop.hbase.util.LossyCounting;
|
||||||
import org.apache.yetus.audience.InterfaceAudience;
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
|
|
||||||
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
|
|
||||||
|
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
public class MetricsUserAggregateImpl implements MetricsUserAggregate{
|
public class MetricsUserAggregateImpl implements MetricsUserAggregate{
|
||||||
|
|
||||||
|
@ -65,8 +64,8 @@ public class MetricsUserAggregateImpl implements MetricsUserAggregate{
|
||||||
return user.isPresent() ? user.get().getShortName() : null;
|
return user.isPresent() ? user.get().getShortName() : null;
|
||||||
}
|
}
|
||||||
|
|
||||||
@VisibleForTesting
|
@Override
|
||||||
MetricsUserAggregateSource getSource() {
|
public MetricsUserAggregateSource getSource() {
|
||||||
return source;
|
return source;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -74,7 +73,39 @@ public class MetricsUserAggregateImpl implements MetricsUserAggregate{
|
||||||
public void updatePut(long t) {
|
public void updatePut(long t) {
|
||||||
String user = getActiveUser();
|
String user = getActiveUser();
|
||||||
if (user != null) {
|
if (user != null) {
|
||||||
getOrCreateMetricsUser(user).updatePut(t);
|
MetricsUserSource userSource = getOrCreateMetricsUser(user);
|
||||||
|
userSource.updatePut(t);
|
||||||
|
incrementClientWriteMetrics(userSource);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
private String getClient() {
|
||||||
|
Optional<InetAddress> ipOptional = RpcServer.getRemoteAddress();
|
||||||
|
if (ipOptional.isPresent()) {
|
||||||
|
return ipOptional.get().getHostName();
|
||||||
|
}
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void incrementClientReadMetrics(MetricsUserSource userSource) {
|
||||||
|
String client = getClient();
|
||||||
|
if (client != null && userSource != null) {
|
||||||
|
userSource.getOrCreateMetricsClient(client).incrementReadRequest();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void incrementFilteredReadRequests(MetricsUserSource userSource) {
|
||||||
|
String client = getClient();
|
||||||
|
if (client != null && userSource != null) {
|
||||||
|
userSource.getOrCreateMetricsClient(client).incrementFilteredReadRequests();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void incrementClientWriteMetrics(MetricsUserSource userSource) {
|
||||||
|
String client = getClient();
|
||||||
|
if (client != null && userSource != null) {
|
||||||
|
userSource.getOrCreateMetricsClient(client).incrementWriteRequest();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -82,7 +113,9 @@ public class MetricsUserAggregateImpl implements MetricsUserAggregate{
|
||||||
public void updateDelete(long t) {
|
public void updateDelete(long t) {
|
||||||
String user = getActiveUser();
|
String user = getActiveUser();
|
||||||
if (user != null) {
|
if (user != null) {
|
||||||
getOrCreateMetricsUser(user).updateDelete(t);
|
MetricsUserSource userSource = getOrCreateMetricsUser(user);
|
||||||
|
userSource.updateDelete(t);
|
||||||
|
incrementClientWriteMetrics(userSource);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -90,7 +123,8 @@ public class MetricsUserAggregateImpl implements MetricsUserAggregate{
|
||||||
public void updateGet(long t) {
|
public void updateGet(long t) {
|
||||||
String user = getActiveUser();
|
String user = getActiveUser();
|
||||||
if (user != null) {
|
if (user != null) {
|
||||||
getOrCreateMetricsUser(user).updateGet(t);
|
MetricsUserSource userSource = getOrCreateMetricsUser(user);
|
||||||
|
userSource.updateGet(t);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -98,7 +132,9 @@ public class MetricsUserAggregateImpl implements MetricsUserAggregate{
|
||||||
public void updateIncrement(long t) {
|
public void updateIncrement(long t) {
|
||||||
String user = getActiveUser();
|
String user = getActiveUser();
|
||||||
if (user != null) {
|
if (user != null) {
|
||||||
getOrCreateMetricsUser(user).updateIncrement(t);
|
MetricsUserSource userSource = getOrCreateMetricsUser(user);
|
||||||
|
userSource.updateIncrement(t);
|
||||||
|
incrementClientWriteMetrics(userSource);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -106,7 +142,9 @@ public class MetricsUserAggregateImpl implements MetricsUserAggregate{
|
||||||
public void updateAppend(long t) {
|
public void updateAppend(long t) {
|
||||||
String user = getActiveUser();
|
String user = getActiveUser();
|
||||||
if (user != null) {
|
if (user != null) {
|
||||||
getOrCreateMetricsUser(user).updateAppend(t);
|
MetricsUserSource userSource = getOrCreateMetricsUser(user);
|
||||||
|
userSource.updateAppend(t);
|
||||||
|
incrementClientWriteMetrics(userSource);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -114,7 +152,9 @@ public class MetricsUserAggregateImpl implements MetricsUserAggregate{
|
||||||
public void updateReplay(long t) {
|
public void updateReplay(long t) {
|
||||||
String user = getActiveUser();
|
String user = getActiveUser();
|
||||||
if (user != null) {
|
if (user != null) {
|
||||||
getOrCreateMetricsUser(user).updateReplay(t);
|
MetricsUserSource userSource = getOrCreateMetricsUser(user);
|
||||||
|
userSource.updateReplay(t);
|
||||||
|
incrementClientWriteMetrics(userSource);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -122,7 +162,24 @@ public class MetricsUserAggregateImpl implements MetricsUserAggregate{
|
||||||
public void updateScanTime(long t) {
|
public void updateScanTime(long t) {
|
||||||
String user = getActiveUser();
|
String user = getActiveUser();
|
||||||
if (user != null) {
|
if (user != null) {
|
||||||
getOrCreateMetricsUser(user).updateScanTime(t);
|
MetricsUserSource userSource = getOrCreateMetricsUser(user);
|
||||||
|
userSource.updateScanTime(t);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override public void updateFilteredReadRequests() {
|
||||||
|
String user = getActiveUser();
|
||||||
|
if (user != null) {
|
||||||
|
MetricsUserSource userSource = getOrCreateMetricsUser(user);
|
||||||
|
incrementFilteredReadRequests(userSource);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override public void updateReadRequestCount() {
|
||||||
|
String user = getActiveUser();
|
||||||
|
if (user != null) {
|
||||||
|
MetricsUserSource userSource = getOrCreateMetricsUser(user);
|
||||||
|
incrementClientReadMetrics(userSource);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -18,28 +18,39 @@
|
||||||
package org.apache.hadoop.hbase;
|
package org.apache.hadoop.hbase;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.security.PrivilegedAction;
|
||||||
import java.util.EnumSet;
|
import java.util.EnumSet;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
import java.util.Optional;
|
import java.util.Optional;
|
||||||
import java.util.concurrent.CompletableFuture;
|
import java.util.concurrent.CompletableFuture;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.hbase.ClusterMetrics.Option;
|
import org.apache.hadoop.hbase.ClusterMetrics.Option;
|
||||||
import org.apache.hadoop.hbase.Waiter.Predicate;
|
import org.apache.hadoop.hbase.Waiter.Predicate;
|
||||||
import org.apache.hadoop.hbase.client.Admin;
|
import org.apache.hadoop.hbase.client.Admin;
|
||||||
import org.apache.hadoop.hbase.client.AsyncAdmin;
|
import org.apache.hadoop.hbase.client.AsyncAdmin;
|
||||||
import org.apache.hadoop.hbase.client.AsyncConnection;
|
import org.apache.hadoop.hbase.client.AsyncConnection;
|
||||||
|
import org.apache.hadoop.hbase.client.Connection;
|
||||||
import org.apache.hadoop.hbase.client.ConnectionFactory;
|
import org.apache.hadoop.hbase.client.ConnectionFactory;
|
||||||
|
import org.apache.hadoop.hbase.client.Get;
|
||||||
import org.apache.hadoop.hbase.client.Put;
|
import org.apache.hadoop.hbase.client.Put;
|
||||||
|
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
|
||||||
import org.apache.hadoop.hbase.client.RegionStatesCount;
|
import org.apache.hadoop.hbase.client.RegionStatesCount;
|
||||||
|
import org.apache.hadoop.hbase.client.Result;
|
||||||
|
import org.apache.hadoop.hbase.client.Scan;
|
||||||
import org.apache.hadoop.hbase.client.Table;
|
import org.apache.hadoop.hbase.client.Table;
|
||||||
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
|
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
|
||||||
import org.apache.hadoop.hbase.coprocessor.MasterCoprocessor;
|
import org.apache.hadoop.hbase.coprocessor.MasterCoprocessor;
|
||||||
import org.apache.hadoop.hbase.coprocessor.MasterCoprocessorEnvironment;
|
import org.apache.hadoop.hbase.coprocessor.MasterCoprocessorEnvironment;
|
||||||
import org.apache.hadoop.hbase.coprocessor.MasterObserver;
|
import org.apache.hadoop.hbase.coprocessor.MasterObserver;
|
||||||
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
|
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
|
||||||
|
import org.apache.hadoop.hbase.filter.FilterAllFilter;
|
||||||
import org.apache.hadoop.hbase.master.HMaster;
|
import org.apache.hadoop.hbase.master.HMaster;
|
||||||
import org.apache.hadoop.hbase.regionserver.HRegionServer;
|
import org.apache.hadoop.hbase.regionserver.HRegionServer;
|
||||||
|
import org.apache.hadoop.hbase.security.User;
|
||||||
|
import org.apache.hadoop.hbase.security.UserProvider;
|
||||||
import org.apache.hadoop.hbase.testclassification.SmallTests;
|
import org.apache.hadoop.hbase.testclassification.SmallTests;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
import org.apache.hadoop.hbase.util.JVMClusterUtil.MasterThread;
|
import org.apache.hadoop.hbase.util.JVMClusterUtil.MasterThread;
|
||||||
|
@ -238,6 +249,137 @@ public class TestClientClusterMetrics {
|
||||||
Assert.assertEquals(MASTERS - 1, metrics.getBackupMasterNames().size());
|
Assert.assertEquals(MASTERS - 1, metrics.getBackupMasterNames().size());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test public void testUserMetrics() throws Exception {
|
||||||
|
Configuration conf = UTIL.getConfiguration();
|
||||||
|
User userFoo = User.createUserForTesting(conf, "FOO_USER_METRIC_TEST", new String[0]);
|
||||||
|
User userBar = User.createUserForTesting(conf, "BAR_USER_METRIC_TEST", new String[0]);
|
||||||
|
User userTest = User.createUserForTesting(conf, "TEST_USER_METRIC_TEST", new String[0]);
|
||||||
|
UTIL.createTable(TABLE_NAME, CF);
|
||||||
|
waitForUsersMetrics(0);
|
||||||
|
long writeMetaMetricBeforeNextuser = getMetaMetrics().getWriteRequestCount();
|
||||||
|
userFoo.runAs(new PrivilegedAction<Void>() {
|
||||||
|
@Override public Void run() {
|
||||||
|
try {
|
||||||
|
doPut();
|
||||||
|
} catch (IOException e) {
|
||||||
|
Assert.fail("Exception:" + e.getMessage());
|
||||||
|
}
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
waitForUsersMetrics(1);
|
||||||
|
long writeMetaMetricForUserFoo =
|
||||||
|
getMetaMetrics().getWriteRequestCount() - writeMetaMetricBeforeNextuser;
|
||||||
|
long readMetaMetricBeforeNextuser = getMetaMetrics().getReadRequestCount();
|
||||||
|
userBar.runAs(new PrivilegedAction<Void>() {
|
||||||
|
@Override public Void run() {
|
||||||
|
try {
|
||||||
|
doGet();
|
||||||
|
} catch (IOException e) {
|
||||||
|
Assert.fail("Exception:" + e.getMessage());
|
||||||
|
}
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
waitForUsersMetrics(2);
|
||||||
|
long readMetaMetricForUserBar =
|
||||||
|
getMetaMetrics().getReadRequestCount() - readMetaMetricBeforeNextuser;
|
||||||
|
long filteredMetaReqeust = getMetaMetrics().getFilteredReadRequestCount();
|
||||||
|
userTest.runAs(new PrivilegedAction<Void>() {
|
||||||
|
@Override public Void run() {
|
||||||
|
try {
|
||||||
|
Table table = createConnection(UTIL.getConfiguration()).getTable(TABLE_NAME);
|
||||||
|
for (Result result : table.getScanner(new Scan().setFilter(new FilterAllFilter()))) {
|
||||||
|
Assert.fail("Should have filtered all rows");
|
||||||
|
}
|
||||||
|
} catch (IOException e) {
|
||||||
|
Assert.fail("Exception:" + e.getMessage());
|
||||||
|
}
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
waitForUsersMetrics(3);
|
||||||
|
long filteredMetaReqeustForTestUser =
|
||||||
|
getMetaMetrics().getFilteredReadRequestCount() - filteredMetaReqeust;
|
||||||
|
Map<byte[], UserMetrics> userMap =
|
||||||
|
ADMIN.getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS)).getLiveServerMetrics().values()
|
||||||
|
.iterator().next().getUserMetrics();
|
||||||
|
for (byte[] user : userMap.keySet()) {
|
||||||
|
switch (Bytes.toString(user)) {
|
||||||
|
case "FOO_USER_METRIC_TEST":
|
||||||
|
Assert.assertEquals(1,
|
||||||
|
userMap.get(user).getWriteRequestCount() - writeMetaMetricForUserFoo);
|
||||||
|
break;
|
||||||
|
case "BAR_USER_METRIC_TEST":
|
||||||
|
Assert
|
||||||
|
.assertEquals(1, userMap.get(user).getReadRequestCount() - readMetaMetricForUserBar);
|
||||||
|
Assert.assertEquals(0, userMap.get(user).getWriteRequestCount());
|
||||||
|
break;
|
||||||
|
case "TEST_USER_METRIC_TEST":
|
||||||
|
Assert.assertEquals(1,
|
||||||
|
userMap.get(user).getFilteredReadRequests() - filteredMetaReqeustForTestUser);
|
||||||
|
Assert.assertEquals(0, userMap.get(user).getWriteRequestCount());
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
//current user
|
||||||
|
Assert.assertEquals(UserProvider.instantiate(conf).getCurrent().getName(),
|
||||||
|
Bytes.toString(user));
|
||||||
|
//Read/write count because of Meta operations
|
||||||
|
Assert.assertTrue(userMap.get(user).getReadRequestCount() > 1);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
UTIL.deleteTable(TABLE_NAME);
|
||||||
|
}
|
||||||
|
|
||||||
|
private RegionMetrics getMetaMetrics() throws IOException {
|
||||||
|
for (ServerMetrics serverMetrics : ADMIN.getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS))
|
||||||
|
.getLiveServerMetrics().values()) {
|
||||||
|
RegionMetrics metaMetrics = serverMetrics.getRegionMetrics()
|
||||||
|
.get(RegionInfoBuilder.FIRST_META_REGIONINFO.getRegionName());
|
||||||
|
if (metaMetrics != null) {
|
||||||
|
return metaMetrics;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Assert.fail("Should have find meta metrics");
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void waitForUsersMetrics(int noOfUsers) throws Exception {
|
||||||
|
//Sleep for metrics to get updated on master
|
||||||
|
Thread.sleep(5000);
|
||||||
|
Waiter.waitFor(CLUSTER.getConfiguration(), 10 * 1000, 100, new Predicate<Exception>() {
|
||||||
|
@Override public boolean evaluate() throws Exception {
|
||||||
|
Map<byte[], UserMetrics> metrics =
|
||||||
|
ADMIN.getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS)).getLiveServerMetrics().values()
|
||||||
|
.iterator().next().getUserMetrics();
|
||||||
|
Assert.assertNotNull(metrics);
|
||||||
|
//including current user + noOfUsers
|
||||||
|
return metrics.keySet().size() > noOfUsers;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
private void doPut() throws IOException {
|
||||||
|
try (Connection conn = createConnection(UTIL.getConfiguration())) {
|
||||||
|
Table table = conn.getTable(TABLE_NAME);
|
||||||
|
table
|
||||||
|
.put(new Put(Bytes.toBytes("a")).addColumn(CF, Bytes.toBytes("col1"), Bytes.toBytes("1")));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void doGet() throws IOException {
|
||||||
|
try (Connection conn = createConnection(UTIL.getConfiguration())) {
|
||||||
|
Table table = conn.getTable(TABLE_NAME);
|
||||||
|
table.get(new Get(Bytes.toBytes("a")).addColumn(CF, Bytes.toBytes("col1")));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private Connection createConnection(Configuration conf) throws IOException {
|
||||||
|
User user = UserProvider.instantiate(conf).getCurrent();
|
||||||
|
return ConnectionFactory.createConnection(conf, user);
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testOtherStatusInfos() throws Exception {
|
public void testOtherStatusInfos() throws Exception {
|
||||||
EnumSet<Option> options =
|
EnumSet<Option> options =
|
||||||
|
|
|
@ -36,6 +36,7 @@ import org.apache.hadoop.hbase.ServerName;
|
||||||
import org.apache.hadoop.hbase.Size;
|
import org.apache.hadoop.hbase.Size;
|
||||||
import org.apache.hadoop.hbase.Stoppable;
|
import org.apache.hadoop.hbase.Stoppable;
|
||||||
import org.apache.hadoop.hbase.TableName;
|
import org.apache.hadoop.hbase.TableName;
|
||||||
|
import org.apache.hadoop.hbase.UserMetrics;
|
||||||
import org.apache.hadoop.hbase.client.RegionInfo;
|
import org.apache.hadoop.hbase.client.RegionInfo;
|
||||||
import org.apache.hadoop.hbase.client.RegionStatesCount;
|
import org.apache.hadoop.hbase.client.RegionStatesCount;
|
||||||
import org.apache.hadoop.hbase.master.assignment.AssignmentManager;
|
import org.apache.hadoop.hbase.master.assignment.AssignmentManager;
|
||||||
|
@ -356,6 +357,10 @@ public class TestRegionsRecoveryChore {
|
||||||
return regionMetricsMap;
|
return regionMetricsMap;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override public Map<byte[], UserMetrics> getUserMetrics() {
|
||||||
|
return new HashMap<>();
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Set<String> getCoprocessorNames() {
|
public Set<String> getCoprocessorNames() {
|
||||||
return null;
|
return null;
|
||||||
|
|
|
@ -17,6 +17,7 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hbase.regionserver;
|
package org.apache.hadoop.hbase.regionserver;
|
||||||
|
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.hbase.CompatibilityFactory;
|
import org.apache.hadoop.hbase.CompatibilityFactory;
|
||||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||||
import org.apache.hadoop.hbase.test.MetricsAssertHelper;
|
import org.apache.hadoop.hbase.test.MetricsAssertHelper;
|
||||||
|
@ -38,7 +39,7 @@ public class TestMetricsRegion {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testRegionWrapperMetrics() {
|
public void testRegionWrapperMetrics() {
|
||||||
MetricsRegion mr = new MetricsRegion(new MetricsRegionWrapperStub());
|
MetricsRegion mr = new MetricsRegion(new MetricsRegionWrapperStub(), new Configuration());
|
||||||
MetricsRegionAggregateSource agg = mr.getSource().getAggregateSource();
|
MetricsRegionAggregateSource agg = mr.getSource().getAggregateSource();
|
||||||
|
|
||||||
HELPER.assertGauge(
|
HELPER.assertGauge(
|
||||||
|
@ -72,7 +73,7 @@ public class TestMetricsRegion {
|
||||||
mr.close();
|
mr.close();
|
||||||
|
|
||||||
// test region with replica id > 0
|
// test region with replica id > 0
|
||||||
mr = new MetricsRegion(new MetricsRegionWrapperStub(1));
|
mr = new MetricsRegion(new MetricsRegionWrapperStub(1), new Configuration());
|
||||||
agg = mr.getSource().getAggregateSource();
|
agg = mr.getSource().getAggregateSource();
|
||||||
HELPER.assertGauge(
|
HELPER.assertGauge(
|
||||||
"namespace_TestNS_table_MetricsRegionWrapperStub_region_DEADBEEF001_metric_storeCount",
|
"namespace_TestNS_table_MetricsRegionWrapperStub_region_DEADBEEF001_metric_storeCount",
|
||||||
|
|
Loading…
Reference in New Issue