HBASE-23065 [hbtop] Top-N heavy hitter user and client drill downs

Signed-off-by: Toshihiro Suzuki <brfrn169@gmail.com>
Signed-off-by: Josh Elser <elserj@apache.org>
Signed-off-by: Andrew Purtell <apurtell@apache.org>
This commit is contained in:
Ankit Singhal 2019-11-17 15:07:52 -08:00 committed by Toshihiro Suzuki
parent eee9480cb4
commit 29d1a97482
40 changed files with 1359 additions and 109 deletions

View File

@ -93,6 +93,11 @@ public interface ServerMetrics {
*/
Map<byte[], RegionMetrics> getRegionMetrics();
/**
* @return metrics per user
*/
Map<byte[], UserMetrics> getUserMetrics();
/**
* Return the RegionServer-level and Region-level coprocessors
* @return string set of loaded RegionServer-level and Region-level coprocessors

View File

@ -77,6 +77,8 @@ public final class ServerMetricsBuilder {
.map(HBaseProtos.Coprocessor::getName).collect(Collectors.toList()))
.setRegionMetrics(serverLoadPB.getRegionLoadsList().stream()
.map(RegionMetricsBuilder::toRegionMetrics).collect(Collectors.toList()))
.setUserMetrics(serverLoadPB.getUserLoadsList().stream()
.map(UserMetricsBuilder::toUserMetrics).collect(Collectors.toList()))
.setReplicationLoadSources(serverLoadPB.getReplLoadSourceList().stream()
.map(ProtobufUtil::toReplicationLoadSource).collect(Collectors.toList()))
.setReplicationLoadSink(serverLoadPB.hasReplLoadSink()
@ -100,19 +102,19 @@ public final class ServerMetricsBuilder {
.setInfoServerPort(metrics.getInfoServerPort())
.setMaxHeapMB((int) metrics.getMaxHeapSize().get(Size.Unit.MEGABYTE))
.setUsedHeapMB((int) metrics.getUsedHeapSize().get(Size.Unit.MEGABYTE))
.addAllCoprocessors(toCoprocessor(metrics.getCoprocessorNames()))
.addAllRegionLoads(metrics.getRegionMetrics().values().stream()
.map(RegionMetricsBuilder::toRegionLoad)
.collect(Collectors.toList()))
.addAllReplLoadSource(metrics.getReplicationLoadSourceList().stream()
.map(ProtobufUtil::toReplicationLoadSource)
.collect(Collectors.toList()))
.addAllCoprocessors(toCoprocessor(metrics.getCoprocessorNames())).addAllRegionLoads(
metrics.getRegionMetrics().values().stream().map(RegionMetricsBuilder::toRegionLoad)
.collect(Collectors.toList())).addAllUserLoads(
metrics.getUserMetrics().values().stream().map(UserMetricsBuilder::toUserMetrics)
.collect(Collectors.toList())).addAllReplLoadSource(
metrics.getReplicationLoadSourceList().stream()
.map(ProtobufUtil::toReplicationLoadSource).collect(Collectors.toList()))
.setReportStartTime(metrics.getLastReportTimestamp())
.setReportEndTime(metrics.getReportTimestamp());
if (metrics.getReplicationLoadSink() != null) {
builder.setReplLoadSink(ProtobufUtil.toReplicationLoadSink(
metrics.getReplicationLoadSink()));
builder.setReplLoadSink(ProtobufUtil.toReplicationLoadSink(metrics.getReplicationLoadSink()));
}
return builder.build();
}
@ -132,6 +134,7 @@ public final class ServerMetricsBuilder {
@Nullable
private ReplicationLoadSink sink = null;
private final Map<byte[], RegionMetrics> regionStatus = new TreeMap<>(Bytes.BYTES_COMPARATOR);
private final Map<byte[], UserMetrics> userMetrics = new TreeMap<>(Bytes.BYTES_COMPARATOR);
private final Set<String> coprocessorNames = new TreeSet<>();
private long reportTimestamp = System.currentTimeMillis();
private long lastReportTimestamp = 0;
@ -189,6 +192,11 @@ public final class ServerMetricsBuilder {
return this;
}
public ServerMetricsBuilder setUserMetrics(List<UserMetrics> value) {
value.forEach(v -> this.userMetrics.put(v.getUserName(), v));
return this;
}
public ServerMetricsBuilder setCoprocessorNames(List<String> value) {
coprocessorNames.addAll(value);
return this;
@ -219,7 +227,8 @@ public final class ServerMetricsBuilder {
regionStatus,
coprocessorNames,
reportTimestamp,
lastReportTimestamp);
lastReportTimestamp,
userMetrics);
}
private static class ServerMetricsImpl implements ServerMetrics {
@ -238,12 +247,13 @@ public final class ServerMetricsBuilder {
private final Set<String> coprocessorNames;
private final long reportTimestamp;
private final long lastReportTimestamp;
private final Map<byte[], UserMetrics> userMetrics;
ServerMetricsImpl(ServerName serverName, int versionNumber, String version,
long requestCountPerSecond, long requestCount, Size usedHeapSize, Size maxHeapSize,
int infoServerPort, List<ReplicationLoadSource> sources, ReplicationLoadSink sink,
Map<byte[], RegionMetrics> regionStatus, Set<String> coprocessorNames, long reportTimestamp,
long lastReportTimestamp) {
long lastReportTimestamp, Map<byte[], UserMetrics> userMetrics) {
this.serverName = Preconditions.checkNotNull(serverName);
this.versionNumber = versionNumber;
this.version = version;
@ -255,6 +265,7 @@ public final class ServerMetricsBuilder {
this.sources = Preconditions.checkNotNull(sources);
this.sink = sink;
this.regionStatus = Preconditions.checkNotNull(regionStatus);
this.userMetrics = Preconditions.checkNotNull(userMetrics);
this.coprocessorNames =Preconditions.checkNotNull(coprocessorNames);
this.reportTimestamp = reportTimestamp;
this.lastReportTimestamp = lastReportTimestamp;
@ -324,6 +335,11 @@ public final class ServerMetricsBuilder {
return Collections.unmodifiableMap(regionStatus);
}
@Override
public Map<byte[], UserMetrics> getUserMetrics() {
return Collections.unmodifiableMap(userMetrics);
}
@Override
public Set<String> getCoprocessorNames() {
return Collections.unmodifiableSet(coprocessorNames);

View File

@ -0,0 +1,86 @@
/**
* Copyright The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase;
import java.util.Map;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability;
/**
* Encapsulates per-user load metrics.
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
public interface UserMetrics {
interface ClientMetrics {
String getHostName();
long getReadRequestsCount();
long getWriteRequestsCount();
long getFilteredReadRequestsCount();
}
/**
* @return the user name
*/
byte[] getUserName();
/**
* @return the number of read requests made by user
*/
long getReadRequestCount();
/**
* @return the number of write requests made by user
*/
long getWriteRequestCount();
/**
* @return the number of write requests and read requests and coprocessor
* service requests made by the user
*/
default long getRequestCount() {
return getReadRequestCount() + getWriteRequestCount();
}
/**
* @return the user name as a string
*/
default String getNameAsString() {
return Bytes.toStringBinary(getUserName());
}
/**
* @return metrics per client(hostname)
*/
Map<String, ClientMetrics> getClientMetrics();
/**
* @return count of filtered read requests for a user
*/
long getFilteredReadRequests();
}

View File

@ -0,0 +1,151 @@
/**
* Copyright The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase;
import java.util.HashMap;
import java.util.Map;
import org.apache.hadoop.hbase.util.Strings;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos;
@InterfaceAudience.Private
public final class UserMetricsBuilder {
public static UserMetrics toUserMetrics(ClusterStatusProtos.UserLoad userLoad) {
UserMetricsBuilder builder = UserMetricsBuilder.newBuilder(userLoad.getUserName().getBytes());
userLoad.getClientMetricsList().stream().map(
clientMetrics -> new ClientMetricsImpl(clientMetrics.getHostName(),
clientMetrics.getReadRequestsCount(), clientMetrics.getWriteRequestsCount(),
clientMetrics.getFilteredRequestsCount())).forEach(builder::addClientMetris);
return builder.build();
}
public static ClusterStatusProtos.UserLoad toUserMetrics(UserMetrics userMetrics) {
ClusterStatusProtos.UserLoad.Builder builder =
ClusterStatusProtos.UserLoad.newBuilder().setUserName(userMetrics.getNameAsString());
userMetrics.getClientMetrics().values().stream().map(
clientMetrics -> ClusterStatusProtos.ClientMetrics.newBuilder()
.setHostName(clientMetrics.getHostName())
.setWriteRequestsCount(clientMetrics.getWriteRequestsCount())
.setReadRequestsCount(clientMetrics.getReadRequestsCount())
.setFilteredRequestsCount(clientMetrics.getFilteredReadRequestsCount()).build())
.forEach(builder::addClientMetrics);
return builder.build();
}
public static UserMetricsBuilder newBuilder(byte[] name) {
return new UserMetricsBuilder(name);
}
private final byte[] name;
private Map<String, UserMetrics.ClientMetrics> clientMetricsMap = new HashMap<>();
private UserMetricsBuilder(byte[] name) {
this.name = name;
}
public UserMetricsBuilder addClientMetris(UserMetrics.ClientMetrics clientMetrics) {
clientMetricsMap.put(clientMetrics.getHostName(), clientMetrics);
return this;
}
public UserMetrics build() {
return new UserMetricsImpl(name, clientMetricsMap);
}
public static class ClientMetricsImpl implements UserMetrics.ClientMetrics {
private final long filteredReadRequestsCount;
private final String hostName;
private final long readRequestCount;
private final long writeRequestCount;
public ClientMetricsImpl(String hostName, long readRequest, long writeRequest,
long filteredReadRequestsCount) {
this.hostName = hostName;
this.readRequestCount = readRequest;
this.writeRequestCount = writeRequest;
this.filteredReadRequestsCount = filteredReadRequestsCount;
}
@Override public String getHostName() {
return hostName;
}
@Override public long getReadRequestsCount() {
return readRequestCount;
}
@Override public long getWriteRequestsCount() {
return writeRequestCount;
}
@Override public long getFilteredReadRequestsCount() {
return filteredReadRequestsCount;
}
}
private static class UserMetricsImpl implements UserMetrics {
private final byte[] name;
private final Map<String, ClientMetrics> clientMetricsMap;
UserMetricsImpl(byte[] name, Map<String, ClientMetrics> clientMetricsMap) {
this.name = Preconditions.checkNotNull(name);
this.clientMetricsMap = clientMetricsMap;
}
@Override public byte[] getUserName() {
return name;
}
@Override public long getReadRequestCount() {
return clientMetricsMap.values().stream().map(c -> c.getReadRequestsCount())
.reduce(0L, Long::sum);
}
@Override public long getWriteRequestCount() {
return clientMetricsMap.values().stream().map(c -> c.getWriteRequestsCount())
.reduce(0L, Long::sum);
}
@Override public Map<String, ClientMetrics> getClientMetrics() {
return this.clientMetricsMap;
}
@Override public long getFilteredReadRequests() {
return clientMetricsMap.values().stream().map(c -> c.getFilteredReadRequestsCount())
.reduce(0L, Long::sum);
}
@Override
public String toString() {
StringBuilder sb = Strings
.appendKeyValue(new StringBuilder(), "readRequestCount", this.getReadRequestCount());
Strings.appendKeyValue(sb, "writeRequestCount", this.getWriteRequestCount());
Strings.appendKeyValue(sb, "filteredReadRequestCount", this.getFilteredReadRequests());
return sb.toString();
}
}
}

View File

@ -18,6 +18,8 @@
package org.apache.hadoop.hbase.regionserver;
import java.util.Map;
import org.apache.hadoop.hbase.metrics.BaseSource;
import org.apache.yetus.audience.InterfaceAudience;
@ -59,4 +61,6 @@ public interface MetricsUserAggregateSource extends BaseSource {
MetricsUserSource getOrCreateMetricsUser(String user);
void deregister(MetricsUserSource toRemove);
Map<String, MetricsUserSource> getUserSources();
}

View File

@ -18,11 +18,31 @@
package org.apache.hadoop.hbase.regionserver;
import java.util.Map;
import org.apache.hadoop.metrics2.MetricsCollector;
import org.apache.yetus.audience.InterfaceAudience;
@InterfaceAudience.Private
public interface MetricsUserSource extends Comparable<MetricsUserSource> {
//These client metrics will be reported through clusterStatus and hbtop only
interface ClientMetrics {
void incrementReadRequest();
void incrementWriteRequest();
String getHostName();
long getReadRequestsCount();
long getWriteRequestsCount();
void incrementFilteredReadRequests();
long getFilteredReadRequests();
}
String getUser();
void register();
@ -42,4 +62,21 @@ public interface MetricsUserSource extends Comparable<MetricsUserSource> {
void updateReplay(long t);
void updateScanTime(long t);
void getMetrics(MetricsCollector metricsCollector, boolean all);
/**
* Metrics collected at client level for a user(needed for reporting through clusterStatus
* and hbtop currently)
* @return metrics per hostname
*/
Map<String, ClientMetrics> getClientMetrics();
/**
* Create a instance of ClientMetrics if not present otherwise return the previous one
*
* @param hostName hostname of the client
* @return Instance of ClientMetrics
*/
ClientMetrics getOrCreateMetricsClient(String hostName);
}

View File

@ -47,4 +47,10 @@ public interface MetricHistogram {
*/
void add(long value);
/**
* Return the total number of values added to the histogram.
* @return the total number of values.
*/
long getCount();
}

View File

@ -18,6 +18,8 @@
package org.apache.hadoop.hbase.regionserver;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.hadoop.hbase.metrics.BaseSourceImpl;
@ -28,8 +30,6 @@ import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
@InterfaceAudience.Private
public class MetricsUserAggregateSourceImpl extends BaseSourceImpl
implements MetricsUserAggregateSource {
@ -90,9 +90,9 @@ public class MetricsUserAggregateSourceImpl extends BaseSourceImpl
}
}
@VisibleForTesting
public ConcurrentHashMap<String, MetricsUserSource> getUserSources() {
return userSources;
@Override
public Map<String, MetricsUserSource> getUserSources() {
return Collections.unmodifiableMap(userSources);
}
@Override

View File

@ -18,9 +18,14 @@
package org.apache.hadoop.hbase.regionserver;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.LongAdder;
import org.apache.hadoop.metrics2.MetricHistogram;
import org.apache.hadoop.metrics2.MetricsCollector;
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
import org.apache.hadoop.metrics2.lib.DynamicMetricsRegistry;
import org.apache.yetus.audience.InterfaceAudience;
@ -57,6 +62,48 @@ public class MetricsUserSourceImpl implements MetricsUserSource {
private final MetricsUserAggregateSourceImpl agg;
private final DynamicMetricsRegistry registry;
private ConcurrentHashMap<String, ClientMetrics> clientMetricsMap;
static class ClientMetricsImpl implements ClientMetrics {
private final String hostName;
final LongAdder readRequestsCount = new LongAdder();
final LongAdder writeRequestsCount = new LongAdder();
final LongAdder filteredRequestsCount = new LongAdder();
public ClientMetricsImpl(String hostName) {
this.hostName = hostName;
}
@Override public void incrementReadRequest() {
readRequestsCount.increment();
}
@Override public void incrementWriteRequest() {
writeRequestsCount.increment();
}
@Override public String getHostName() {
return hostName;
}
@Override public long getReadRequestsCount() {
return readRequestsCount.sum();
}
@Override public long getWriteRequestsCount() {
return writeRequestsCount.sum();
}
@Override public void incrementFilteredReadRequests() {
filteredRequestsCount.increment();
}
@Override public long getFilteredReadRequests() {
return filteredRequestsCount.sum();
}
}
public MetricsUserSourceImpl(String user, MetricsUserAggregateSourceImpl agg) {
if (LOG.isDebugEnabled()) {
LOG.debug("Creating new MetricsUserSourceImpl for user " + user);
@ -77,7 +124,7 @@ public class MetricsUserSourceImpl implements MetricsUserSource {
userIncrementKey = userNamePrefix + MetricsRegionServerSource.INCREMENT_KEY;
userAppendKey = userNamePrefix + MetricsRegionServerSource.APPEND_KEY;
userReplayKey = userNamePrefix + MetricsRegionServerSource.REPLAY_KEY;
clientMetricsMap = new ConcurrentHashMap<>();
agg.register(this);
}
@ -204,4 +251,27 @@ public class MetricsUserSourceImpl implements MetricsUserSource {
public void updateScanTime(long t) {
scanTimeHisto.add(t);
}
@Override public void getMetrics(MetricsCollector metricsCollector, boolean all) {
MetricsRecordBuilder mrb = metricsCollector.addRecord(this.userNamePrefix);
registry.snapshot(mrb, all);
}
@Override public Map<String, ClientMetrics> getClientMetrics() {
return Collections.unmodifiableMap(clientMetricsMap);
}
@Override public ClientMetrics getOrCreateMetricsClient(String client) {
ClientMetrics source = clientMetricsMap.get(client);
if (source != null) {
return source;
}
source = new ClientMetricsImpl(client);
ClientMetrics prev = clientMetricsMap.putIfAbsent(client, source);
if (prev != null) {
return prev;
}
return source;
}
}

View File

@ -51,6 +51,10 @@ public class MutableHistogram extends MutableMetric implements MetricHistogram {
histogram.update(val);
}
@Override public long getCount() {
return histogram.getCount();
}
public long getMax() {
return histogram.getMax();
}

View File

@ -83,5 +83,9 @@ public abstract class MutableRangeHistogram extends MutableHistogram implements
Interns.info(name + "_" + rangeType + "_" + ranges[ranges.length - 1] + "-inf", desc),
val - cumNum);
}
}
}
@Override public long getCount() {
return histogram.getCount();
}
}

View File

@ -143,6 +143,10 @@ public final class RecordFilter {
this.value = Objects.requireNonNull(value);
}
public Field getField() {
return field;
}
public boolean execute(Record record) {
FieldValue fieldValue = record.get(field);
if (fieldValue == null) {

View File

@ -57,7 +57,11 @@ public enum Field {
FieldValueType.STRING),
REGION_COUNT("#REGION", "Region Count", false, false, FieldValueType.INTEGER),
USED_HEAP_SIZE("UHEAP", "Used Heap Size", false, false, FieldValueType.SIZE),
MAX_HEAP_SIZE("MHEAP", "Max Heap Size", false, false, FieldValueType.SIZE);
USER("USER", "user Name", true, true, FieldValueType.STRING),
MAX_HEAP_SIZE("MHEAP", "Max Heap Size", false, false, FieldValueType.SIZE),
CLIENT_COUNT("#CLIENT", "Client Count", false, false, FieldValueType.INTEGER),
USER_COUNT("#USER", "User Count", false, false, FieldValueType.INTEGER),
CLIENT("CLIENT", "Client Hostname", true, true, FieldValueType.STRING);
private final String header;
private final String description;

View File

@ -0,0 +1,157 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.hbtop.mode;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.hadoop.hbase.ClusterMetrics;
import org.apache.hadoop.hbase.ServerMetrics;
import org.apache.hadoop.hbase.UserMetrics;
import org.apache.hadoop.hbase.hbtop.Record;
import org.apache.hadoop.hbase.hbtop.RecordFilter;
import org.apache.hadoop.hbase.hbtop.field.Field;
import org.apache.hadoop.hbase.hbtop.field.FieldInfo;
import org.apache.hadoop.hbase.hbtop.field.FieldValue;
import org.apache.hadoop.hbase.hbtop.field.FieldValueType;
import org.apache.yetus.audience.InterfaceAudience;
/**
* Implementation for {@link ModeStrategy} for client Mode.
*/
@InterfaceAudience.Private public final class ClientModeStrategy implements ModeStrategy {
private final List<FieldInfo> fieldInfos = Arrays
.asList(new FieldInfo(Field.CLIENT, 0, true),
new FieldInfo(Field.USER_COUNT, 5, true),
new FieldInfo(Field.REQUEST_COUNT_PER_SECOND, 10, true),
new FieldInfo(Field.READ_REQUEST_COUNT_PER_SECOND, 10, true),
new FieldInfo(Field.WRITE_REQUEST_COUNT_PER_SECOND, 10, true),
new FieldInfo(Field.FILTERED_READ_REQUEST_COUNT_PER_SECOND, 10, true));
private final Map<String, RequestCountPerSecond> requestCountPerSecondMap = new HashMap<>();
ClientModeStrategy() {
}
@Override public List<FieldInfo> getFieldInfos() {
return fieldInfos;
}
@Override public Field getDefaultSortField() {
return Field.REQUEST_COUNT_PER_SECOND;
}
@Override public List<Record> getRecords(ClusterMetrics clusterMetrics,
List<RecordFilter> pushDownFilters) {
List<Record> records = createRecords(clusterMetrics);
return aggregateRecordsAndAddDistinct(
ModeStrategyUtils.applyFilterAndGet(records, pushDownFilters), Field.CLIENT, Field.USER,
Field.USER_COUNT);
}
List<Record> createRecords(ClusterMetrics clusterMetrics) {
List<Record> ret = new ArrayList<>();
for (ServerMetrics serverMetrics : clusterMetrics.getLiveServerMetrics().values()) {
long lastReportTimestamp = serverMetrics.getLastReportTimestamp();
serverMetrics.getUserMetrics().values().forEach(um -> um.getClientMetrics().values().forEach(
clientMetrics -> ret.add(
createRecord(um.getNameAsString(), clientMetrics, lastReportTimestamp,
serverMetrics.getServerName().getServerName()))));
}
return ret;
}
/**
* Aggregate the records and count the unique values for the given distinctField
*
* @param records records to be processed
* @param groupBy Field on which group by needs to be done
* @param distinctField Field whose unique values needs to be counted
* @param uniqueCountAssignedTo a target field to which the unique count is assigned to
* @return aggregated records
*/
List<Record> aggregateRecordsAndAddDistinct(List<Record> records, Field groupBy,
Field distinctField, Field uniqueCountAssignedTo) {
List<Record> result = new ArrayList<>();
records.stream().collect(Collectors.groupingBy(r -> r.get(groupBy))).values()
.forEach(val -> {
Set<FieldValue> distinctValues = new HashSet<>();
Map<Field, FieldValue> map = new HashMap<>();
for (Record record : val) {
for (Map.Entry<Field, FieldValue> field : record.entrySet()) {
if (distinctField.equals(field.getKey())) {
//We will not be adding the field in the new record whose distinct count is required
distinctValues.add(record.get(distinctField));
} else {
if (field.getKey().getFieldValueType() == FieldValueType.STRING) {
map.put(field.getKey(), field.getValue());
} else {
if (map.get(field.getKey()) == null) {
map.put(field.getKey(), field.getValue());
} else {
map.put(field.getKey(), map.get(field.getKey()).plus(field.getValue()));
}
}
}
}
}
// Add unique count field
map.put(uniqueCountAssignedTo, uniqueCountAssignedTo.newValue(distinctValues.size()));
result.add(Record.ofEntries(map.entrySet().stream()
.map(k -> Record.entry(k.getKey(), k.getValue()))));
});
return result;
}
Record createRecord(String user, UserMetrics.ClientMetrics clientMetrics,
long lastReportTimestamp, String server) {
Record.Builder builder = Record.builder();
String client = clientMetrics.getHostName();
builder.put(Field.CLIENT, clientMetrics.getHostName());
String mapKey = client + "$" + user + "$" + server;
RequestCountPerSecond requestCountPerSecond = requestCountPerSecondMap.get(mapKey);
if (requestCountPerSecond == null) {
requestCountPerSecond = new RequestCountPerSecond();
requestCountPerSecondMap.put(mapKey, requestCountPerSecond);
}
requestCountPerSecond.refresh(lastReportTimestamp, clientMetrics.getReadRequestsCount(),
clientMetrics.getFilteredReadRequestsCount(), clientMetrics.getWriteRequestsCount());
builder.put(Field.REQUEST_COUNT_PER_SECOND, requestCountPerSecond.getRequestCountPerSecond());
builder.put(Field.READ_REQUEST_COUNT_PER_SECOND,
requestCountPerSecond.getReadRequestCountPerSecond());
builder.put(Field.WRITE_REQUEST_COUNT_PER_SECOND,
requestCountPerSecond.getWriteRequestCountPerSecond());
builder.put(Field.FILTERED_READ_REQUEST_COUNT_PER_SECOND,
requestCountPerSecond.getFilteredReadRequestCountPerSecond());
builder.put(Field.USER, user);
return builder.build();
}
@Override public DrillDownInfo drillDown(Record selectedRecord) {
List<RecordFilter> initialFilters = Collections.singletonList(
RecordFilter.newBuilder(Field.CLIENT).doubleEquals(selectedRecord.get(Field.CLIENT)));
return new DrillDownInfo(Mode.USER, initialFilters);
}
}

View File

@ -22,6 +22,7 @@ import java.util.List;
import java.util.Objects;
import org.apache.hadoop.hbase.ClusterMetrics;
import org.apache.hadoop.hbase.hbtop.Record;
import org.apache.hadoop.hbase.hbtop.RecordFilter;
import org.apache.hadoop.hbase.hbtop.field.Field;
import org.apache.hadoop.hbase.hbtop.field.FieldInfo;
import org.apache.yetus.audience.InterfaceAudience;
@ -35,7 +36,9 @@ public enum Mode {
NAMESPACE("Namespace", "Record per Namespace", new NamespaceModeStrategy()),
TABLE("Table", "Record per Table", new TableModeStrategy()),
REGION("Region", "Record per Region", new RegionModeStrategy()),
REGION_SERVER("RegionServer", "Record per RegionServer", new RegionServerModeStrategy());
REGION_SERVER("RegionServer", "Record per RegionServer", new RegionServerModeStrategy()),
USER("User", "Record per user", new UserModeStrategy()),
CLIENT("Client", "Record per client", new ClientModeStrategy());
private final String header;
private final String description;
@ -55,8 +58,9 @@ public enum Mode {
return description;
}
public List<Record> getRecords(ClusterMetrics clusterMetrics) {
return modeStrategy.getRecords(clusterMetrics);
public List<Record> getRecords(ClusterMetrics clusterMetrics,
List<RecordFilter> pushDownFilters) {
return modeStrategy.getRecords(clusterMetrics, pushDownFilters);
}
public List<FieldInfo> getFieldInfos() {

View File

@ -21,6 +21,7 @@ import edu.umd.cs.findbugs.annotations.Nullable;
import java.util.List;
import org.apache.hadoop.hbase.ClusterMetrics;
import org.apache.hadoop.hbase.hbtop.Record;
import org.apache.hadoop.hbase.hbtop.RecordFilter;
import org.apache.hadoop.hbase.hbtop.field.Field;
import org.apache.hadoop.hbase.hbtop.field.FieldInfo;
import org.apache.yetus.audience.InterfaceAudience;
@ -33,6 +34,6 @@ import org.apache.yetus.audience.InterfaceAudience;
interface ModeStrategy {
List<FieldInfo> getFieldInfos();
Field getDefaultSortField();
List<Record> getRecords(ClusterMetrics clusterMetrics);
List<Record> getRecords(ClusterMetrics clusterMetrics, List<RecordFilter> pushDownFilters);
@Nullable DrillDownInfo drillDown(Record selectedRecord);
}

View File

@ -0,0 +1,63 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.hbtop.mode;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.hadoop.hbase.hbtop.Record;
import org.apache.hadoop.hbase.hbtop.RecordFilter;
import org.apache.hadoop.hbase.hbtop.field.Field;
public final class ModeStrategyUtils {
private ModeStrategyUtils() {
}
/**
* Filter records as per the supplied filters,
* @param records records to be processed
* @param filters List of filters
* @return filtered records
*/
public static List<Record> applyFilterAndGet(List<Record> records,
List<RecordFilter> filters) {
if (filters != null && !filters.isEmpty()) {
return records.stream().filter(r -> filters.stream().allMatch(f -> f.execute(r)))
.collect(Collectors.toList());
}
return records;
}
/**
* Group by records on the basis of supplied groupBy field and
* Aggregate records using {@link Record#combine(Record)}
*
* @param records records needs to be processed
* @param groupBy Field to be used for group by
* @return aggregated records
*/
public static List<Record> aggregateRecords(List<Record> records, Field groupBy) {
return records.stream().collect(Collectors.groupingBy(r -> r.get(groupBy))).entrySet().stream()
.flatMap(e -> e.getValue().stream().reduce(Record::combine).map(Stream::of)
.orElse(Stream.empty())).collect(Collectors.toList());
}
}

View File

@ -20,8 +20,7 @@ package org.apache.hadoop.hbase.hbtop.mode;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.hadoop.hbase.ClusterMetrics;
import org.apache.hadoop.hbase.hbtop.Record;
import org.apache.hadoop.hbase.hbtop.RecordFilter;
@ -64,27 +63,14 @@ public final class NamespaceModeStrategy implements ModeStrategy {
return Field.REQUEST_COUNT_PER_SECOND;
}
@Override
public List<Record> getRecords(ClusterMetrics clusterMetrics) {
@Override public List<Record> getRecords(ClusterMetrics clusterMetrics,
List<RecordFilter> pushDownFilters) {
// Get records from RegionModeStrategy and add REGION_COUNT field
List<Record> records = regionModeStrategy.getRecords(clusterMetrics).stream()
.map(record ->
Record.ofEntries(fieldInfos.stream()
.filter(fi -> record.containsKey(fi.getField()))
.map(fi -> Record.entry(fi.getField(), record.get(fi.getField())))))
.map(record -> Record.builder().putAll(record).put(Field.REGION_COUNT, 1).build())
.collect(Collectors.toList());
List<Record> records = regionModeStrategy.selectModeFieldsAndAddCountField(fieldInfos,
regionModeStrategy.getRecords(clusterMetrics, pushDownFilters), Field.REGION_COUNT);
// Aggregation by NAMESPACE field
return records.stream()
.collect(Collectors.groupingBy(r -> r.get(Field.NAMESPACE).asString()))
.entrySet().stream()
.flatMap(
e -> e.getValue().stream()
.reduce(Record::combine)
.map(Stream::of)
.orElse(Stream.empty()))
.collect(Collectors.toList());
return ModeStrategyUtils.aggregateRecords(records, Field.NAMESPACE);
}
@Override

View File

@ -24,6 +24,8 @@ import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.commons.lang3.time.FastDateFormat;
import org.apache.hadoop.hbase.ClusterMetrics;
import org.apache.hadoop.hbase.RegionMetrics;
@ -31,6 +33,7 @@ import org.apache.hadoop.hbase.ServerMetrics;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.hbtop.Record;
import org.apache.hadoop.hbase.hbtop.RecordFilter;
import org.apache.hadoop.hbase.hbtop.field.Field;
import org.apache.hadoop.hbase.hbtop.field.FieldInfo;
import org.apache.hadoop.hbase.util.Bytes;
@ -83,8 +86,8 @@ public final class RegionModeStrategy implements ModeStrategy {
return Field.REQUEST_COUNT_PER_SECOND;
}
@Override
public List<Record> getRecords(ClusterMetrics clusterMetrics) {
@Override public List<Record> getRecords(ClusterMetrics clusterMetrics,
List<RecordFilter> pushDownFilters) {
List<Record> ret = new ArrayList<>();
for (ServerMetrics sm : clusterMetrics.getLiveServerMetrics().values()) {
long lastReportTimestamp = sm.getLastReportTimestamp();
@ -174,6 +177,27 @@ public final class RegionModeStrategy implements ModeStrategy {
return builder.build();
}
/**
* Form new record list with records formed by only fields provided through fieldInfo and
* add a count field for each record with value 1
* We are doing two operation of selecting and adding new field
* because of saving some CPU cycles on rebuilding the record again
*
* @param fieldInfos List of FieldInfos required in the record
* @param records List of records which needs to be processed
* @param countField Field which needs to be added with value 1 for each record
* @return records after selecting required fields and adding count field
*/
List<Record> selectModeFieldsAndAddCountField(List<FieldInfo> fieldInfos, List<Record> records,
Field countField) {
return records.stream().map(record -> Record.ofEntries(
fieldInfos.stream().filter(fi -> record.containsKey(fi.getField()))
.map(fi -> Record.entry(fi.getField(), record.get(fi.getField())))))
.map(record -> Record.builder().putAll(record).put(countField, 1).build())
.collect(Collectors.toList());
}
@Nullable
@Override
public DrillDownInfo drillDown(Record selectedRecord) {

View File

@ -23,7 +23,7 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.hadoop.hbase.ClusterMetrics;
import org.apache.hadoop.hbase.ServerMetrics;
import org.apache.hadoop.hbase.hbtop.Record;
@ -70,27 +70,15 @@ public final class RegionServerModeStrategy implements ModeStrategy {
return Field.REQUEST_COUNT_PER_SECOND;
}
@Override
public List<Record> getRecords(ClusterMetrics clusterMetrics) {
@Override public List<Record> getRecords(ClusterMetrics clusterMetrics,
List<RecordFilter> pushDownFilters) {
// Get records from RegionModeStrategy and add REGION_COUNT field
List<Record> records = regionModeStrategy.getRecords(clusterMetrics).stream()
.map(record ->
Record.ofEntries(fieldInfos.stream()
.filter(fi -> record.containsKey(fi.getField()))
.map(fi -> Record.entry(fi.getField(), record.get(fi.getField())))))
.map(record -> Record.builder().putAll(record).put(Field.REGION_COUNT, 1).build())
.collect(Collectors.toList());
List<Record> records = regionModeStrategy.selectModeFieldsAndAddCountField(fieldInfos,
regionModeStrategy.getRecords(clusterMetrics, pushDownFilters), Field.REGION_COUNT);
// Aggregation by LONG_REGION_SERVER field
Map<String, Record> retMap = records.stream()
.collect(Collectors.groupingBy(r -> r.get(Field.LONG_REGION_SERVER).asString()))
.entrySet().stream()
.flatMap(
e -> e.getValue().stream()
.reduce(Record::combine)
.map(Stream::of)
.orElse(Stream.empty()))
.collect(Collectors.toMap(r -> r.get(Field.LONG_REGION_SERVER).asString(), r -> r));
Map<String, Record> retMap =
ModeStrategyUtils.aggregateRecords(records, Field.LONG_REGION_SERVER).stream()
.collect(Collectors.toMap(r -> r.get(Field.LONG_REGION_SERVER).asString(), r -> r));
// Add USED_HEAP_SIZE field and MAX_HEAP_SIZE field
for (ServerMetrics sm : clusterMetrics.getLiveServerMetrics().values()) {

View File

@ -65,16 +65,11 @@ public final class TableModeStrategy implements ModeStrategy {
return Field.REQUEST_COUNT_PER_SECOND;
}
@Override
public List<Record> getRecords(ClusterMetrics clusterMetrics) {
@Override public List<Record> getRecords(ClusterMetrics clusterMetrics,
List<RecordFilter> pushDownFilters) {
// Get records from RegionModeStrategy and add REGION_COUNT field
List<Record> records = regionModeStrategy.getRecords(clusterMetrics).stream()
.map(record ->
Record.ofEntries(fieldInfos.stream()
.filter(fi -> record.containsKey(fi.getField()))
.map(fi -> Record.entry(fi.getField(), record.get(fi.getField())))))
.map(record -> Record.builder().putAll(record).put(Field.REGION_COUNT, 1).build())
.collect(Collectors.toList());
List<Record> records = regionModeStrategy.selectModeFieldsAndAddCountField(fieldInfos,
regionModeStrategy.getRecords(clusterMetrics, pushDownFilters), Field.REGION_COUNT);
// Aggregation by NAMESPACE field and TABLE field
return records.stream()

View File

@ -0,0 +1,70 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.hbtop.mode;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import org.apache.hadoop.hbase.ClusterMetrics;
import org.apache.hadoop.hbase.hbtop.Record;
import org.apache.hadoop.hbase.hbtop.RecordFilter;
import org.apache.hadoop.hbase.hbtop.field.Field;
import org.apache.hadoop.hbase.hbtop.field.FieldInfo;
import org.apache.yetus.audience.InterfaceAudience;
/**
* Implementation for {@link ModeStrategy} for User Mode.
*/
@InterfaceAudience.Private public final class UserModeStrategy implements ModeStrategy {
private final List<FieldInfo> fieldInfos = Arrays
.asList(new FieldInfo(Field.USER, 0, true),
new FieldInfo(Field.CLIENT_COUNT, 7, true),
new FieldInfo(Field.REQUEST_COUNT_PER_SECOND, 10, true),
new FieldInfo(Field.READ_REQUEST_COUNT_PER_SECOND, 10, true),
new FieldInfo(Field.WRITE_REQUEST_COUNT_PER_SECOND, 10, true),
new FieldInfo(Field.FILTERED_READ_REQUEST_COUNT_PER_SECOND, 10, true));
private final ClientModeStrategy clientModeStrategy = new ClientModeStrategy();
UserModeStrategy() {
}
@Override public List<FieldInfo> getFieldInfos() {
return fieldInfos;
}
@Override public Field getDefaultSortField() {
return Field.REQUEST_COUNT_PER_SECOND;
}
@Override public List<Record> getRecords(ClusterMetrics clusterMetrics,
List<RecordFilter> pushDownFilters) {
List<Record> records = clientModeStrategy.createRecords(clusterMetrics);
return clientModeStrategy.aggregateRecordsAndAddDistinct(
ModeStrategyUtils.applyFilterAndGet(records, pushDownFilters), Field.USER, Field.CLIENT,
Field.CLIENT_COUNT);
}
@Override public DrillDownInfo drillDown(Record selectedRecord) {
//Drill down to client and using selected USER as a filter
List<RecordFilter> initialFilters = Collections.singletonList(
RecordFilter.newBuilder(Field.USER).doubleEquals(selectedRecord.get(Field.USER)));
return new DrillDownInfo(Mode.CLIENT, initialFilters);
}
}

View File

@ -17,12 +17,15 @@
*/
package org.apache.hadoop.hbase.hbtop.screen.top;
import static org.apache.commons.lang3.time.DateFormatUtils.ISO_8601_EXTENDED_TIME_FORMAT;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.hadoop.hbase.ClusterMetrics;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.hbtop.Record;
@ -56,6 +59,7 @@ public class TopScreenModel {
private List<Record> records;
private final List<RecordFilter> filters = new ArrayList<>();
private final List<RecordFilter> pushDownFilters = new ArrayList<>();
private final List<String> filterHistories = new ArrayList<>();
private boolean ascendingSort;
@ -88,6 +92,7 @@ public class TopScreenModel {
if (initialFilters != null) {
filters.addAll(initialFilters);
}
decomposePushDownFilter();
}
public void setSortFieldAndFields(Field sortField, List<Field> fields) {
@ -113,7 +118,7 @@ public class TopScreenModel {
}
private void refreshSummary(ClusterMetrics clusterMetrics) {
String currentTime = DateFormatUtils.ISO_8601_EXTENDED_TIME_FORMAT
String currentTime = ISO_8601_EXTENDED_TIME_FORMAT
.format(System.currentTimeMillis());
String version = clusterMetrics.getHBaseVersion();
String clusterId = clusterMetrics.getClusterId();
@ -130,7 +135,7 @@ public class TopScreenModel {
}
private void refreshRecords(ClusterMetrics clusterMetrics) {
List<Record> records = currentMode.getRecords(clusterMetrics);
List<Record> records = currentMode.getRecords(clusterMetrics, pushDownFilters);
// Filter and sort
records = records.stream()
@ -153,13 +158,13 @@ public class TopScreenModel {
if (filter == null) {
return false;
}
filters.add(filter);
filterHistories.add(filterString);
return true;
}
public void clearFilters() {
pushDownFilters.clear();
filters.clear();
}
@ -203,4 +208,18 @@ public class TopScreenModel {
public List<String> getFilterHistories() {
return Collections.unmodifiableList(filterHistories);
}
private void decomposePushDownFilter() {
pushDownFilters.clear();
for (RecordFilter filter : filters) {
if (!fields.contains(filter.getField())) {
pushDownFilters.add(filter);
}
}
filters.removeAll(pushDownFilters);
}
public Collection<? extends RecordFilter> getPushDownFilters() {
return pushDownFilters;
}
}

View File

@ -25,6 +25,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import org.apache.hadoop.hbase.hbtop.Record;
import org.apache.hadoop.hbase.hbtop.RecordFilter;
import org.apache.hadoop.hbase.hbtop.field.Field;
import org.apache.hadoop.hbase.hbtop.field.FieldInfo;
import org.apache.hadoop.hbase.hbtop.mode.Mode;
@ -324,7 +325,8 @@ public class TopScreenPresenter {
}
public ScreenView goToFilterDisplayMode(Screen screen, Terminal terminal, int row) {
return new FilterDisplayModeScreenView(screen, terminal, row, topScreenModel.getFilters(),
topScreenView);
ArrayList<RecordFilter> filters = new ArrayList<>(topScreenModel.getFilters());
filters.addAll(topScreenModel.getPushDownFilters());
return new FilterDisplayModeScreenView(screen, terminal, row, filters, topScreenView);
}
}

View File

@ -27,6 +27,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.commons.lang3.time.FastDateFormat;
import org.apache.hadoop.hbase.ClusterMetrics;
import org.apache.hadoop.hbase.ClusterMetricsBuilder;
@ -37,6 +38,8 @@ import org.apache.hadoop.hbase.ServerMetricsBuilder;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.Size;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.UserMetrics;
import org.apache.hadoop.hbase.UserMetricsBuilder;
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
import org.apache.hadoop.hbase.hbtop.field.Field;
import org.apache.hadoop.hbase.hbtop.screen.top.Summary;
@ -54,6 +57,9 @@ public final class TestUtils {
// host1
List<RegionMetrics> regionMetricsList = new ArrayList<>();
List<UserMetrics> userMetricsList = new ArrayList<>();
userMetricsList.add(createUserMetrics("FOO",1,2, 4));
userMetricsList.add(createUserMetrics("BAR",2,3, 3));
regionMetricsList.add(createRegionMetrics(
"table1,,1.00000000000000000000000000000000.",
100, 50, 100,
@ -73,10 +79,13 @@ public final class TestUtils {
ServerName host1 = ServerName.valueOf("host1.apache.com", 1000, 1);
serverMetricsMap.put(host1, createServerMetrics(host1, 100,
new Size(100, Size.Unit.MEGABYTE), new Size(200, Size.Unit.MEGABYTE), 100,
regionMetricsList));
regionMetricsList, userMetricsList));
// host2
regionMetricsList.clear();
userMetricsList.clear();
userMetricsList.add(createUserMetrics("FOO",5,7, 3));
userMetricsList.add(createUserMetrics("BAR",4,8, 4));
regionMetricsList.add(createRegionMetrics(
"table1,1,4.00000000000000000000000000000003.",
100, 50, 100,
@ -96,7 +105,7 @@ public final class TestUtils {
ServerName host2 = ServerName.valueOf("host2.apache.com", 1001, 2);
serverMetricsMap.put(host2, createServerMetrics(host2, 200,
new Size(16, Size.Unit.GIGABYTE), new Size(32, Size.Unit.GIGABYTE), 200,
regionMetricsList));
regionMetricsList, userMetricsList));
ServerName host3 = ServerName.valueOf("host3.apache.com", 1002, 3);
return ClusterMetricsBuilder.newBuilder()
@ -117,6 +126,15 @@ public final class TestUtils {
.build();
}
private static UserMetrics createUserMetrics(String user, long readRequestCount,
long writeRequestCount, long filteredReadRequestsCount) {
return UserMetricsBuilder.newBuilder(Bytes.toBytes(user)).addClientMetris(
new UserMetricsBuilder.ClientMetricsImpl("CLIENT_A_" + user, readRequestCount,
writeRequestCount, filteredReadRequestsCount)).addClientMetris(
new UserMetricsBuilder.ClientMetricsImpl("CLIENT_B_" + user, readRequestCount,
writeRequestCount, filteredReadRequestsCount)).build();
}
private static RegionMetrics createRegionMetrics(String regionName, long readRequestCount,
long filteredReadRequestCount, long writeRequestCount, Size storeFileSize,
Size uncompressedStoreFileSize, int storeFileCount, Size memStoreSize, float locality,
@ -139,14 +157,15 @@ public final class TestUtils {
private static ServerMetrics createServerMetrics(ServerName serverName, long reportTimestamp,
Size usedHeapSize, Size maxHeapSize, long requestCountPerSecond,
List<RegionMetrics> regionMetricsList) {
List<RegionMetrics> regionMetricsList, List<UserMetrics> userMetricsList) {
return ServerMetricsBuilder.newBuilder(serverName)
.setReportTimestamp(reportTimestamp)
.setUsedHeapSize(usedHeapSize)
.setMaxHeapSize(maxHeapSize)
.setRequestCountPerSecond(requestCountPerSecond)
.setRegionMetrics(regionMetricsList).build();
.setRegionMetrics(regionMetricsList)
.setUserMetrics(userMetricsList).build();
}
public static void assertRecordsInRegionMode(List<Record> records) {
@ -316,10 +335,78 @@ public final class TestUtils {
}
}
public static void assertRecordsInUserMode(List<Record> records) {
assertThat(records.size(), is(2));
for (Record record : records) {
String user = record.get(Field.USER).asString();
switch (user) {
//readRequestPerSecond and writeRequestPerSecond will be zero
// because there is no change or new metrics during refresh
case "FOO":
assertRecordInUserMode(record, 0L, 0L, 0L);
break;
case "BAR":
assertRecordInUserMode(record, 0L, 0L, 0L);
break;
default:
fail();
}
}
}
public static void assertRecordsInClientMode(List<Record> records) {
assertThat(records.size(), is(4));
for (Record record : records) {
String client = record.get(Field.CLIENT).asString();
switch (client) {
//readRequestPerSecond and writeRequestPerSecond will be zero
// because there is no change or new metrics during refresh
case "CLIENT_A_FOO":
assertRecordInClientMode(record, 0L, 0L, 0L);
break;
case "CLIENT_A_BAR":
assertRecordInClientMode(record, 0L, 0L, 0L);
break;
case "CLIENT_B_FOO":
assertRecordInClientMode(record, 0L, 0L, 0L);
break;
case "CLIENT_B_BAR":
assertRecordInClientMode(record, 0L, 0L, 0L);
break;
default:
fail();
}
}
}
private static void assertRecordInUserMode(Record record, long readRequestCountPerSecond,
long writeCountRequestPerSecond, long filteredReadRequestsCount) {
assertThat(record.size(), is(6));
assertThat(record.get(Field.READ_REQUEST_COUNT_PER_SECOND).asLong(),
is(readRequestCountPerSecond));
assertThat(record.get(Field.WRITE_REQUEST_COUNT_PER_SECOND).asLong(),
is(writeCountRequestPerSecond));
assertThat(record.get(Field.FILTERED_READ_REQUEST_COUNT_PER_SECOND).asLong(),
is(filteredReadRequestsCount));
assertThat(record.get(Field.CLIENT_COUNT).asInt(), is(2));
}
private static void assertRecordInClientMode(Record record, long readRequestCountPerSecond,
long writeCountRequestPerSecond, long filteredReadRequestsCount) {
assertThat(record.size(), is(6));
assertThat(record.get(Field.READ_REQUEST_COUNT_PER_SECOND).asLong(),
is(readRequestCountPerSecond));
assertThat(record.get(Field.WRITE_REQUEST_COUNT_PER_SECOND).asLong(),
is(writeCountRequestPerSecond));
assertThat(record.get(Field.FILTERED_READ_REQUEST_COUNT_PER_SECOND).asLong(),
is(filteredReadRequestsCount));
assertThat(record.get(Field.USER_COUNT).asInt(), is(1));
}
private static void assertRecordInTableMode(Record record, long requestCountPerSecond,
long readRequestCountPerSecond, long filteredReadRequestCountPerSecond,
long writeCountRequestPerSecond, Size storeFileSize, Size uncompressedStoreFileSize,
int numStoreFiles, Size memStoreSize, int regionCount) {
long readRequestCountPerSecond, long filteredReadRequestCountPerSecond,
long writeCountRequestPerSecond, Size storeFileSize, Size uncompressedStoreFileSize,
int numStoreFiles, Size memStoreSize, int regionCount) {
assertThat(record.size(), is(11));
assertThat(record.get(Field.REQUEST_COUNT_PER_SECOND).asLong(),
is(requestCountPerSecond));

View File

@ -0,0 +1,72 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.hbtop.mode;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.fail;
import java.util.List;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.hbtop.Record;
import org.apache.hadoop.hbase.hbtop.TestUtils;
import org.apache.hadoop.hbase.hbtop.field.Field;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.junit.ClassRule;
import org.junit.experimental.categories.Category;
@Category(SmallTests.class)
public class ClientModeTest extends ModeTestBase {
@ClassRule public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(ClientModeTest.class);
@Override protected Mode getMode() {
return Mode.CLIENT;
}
@Override protected void assertRecords(List<Record> records) {
TestUtils.assertRecordsInClientMode(records);
}
@Override protected void assertDrillDown(Record currentRecord, DrillDownInfo drillDownInfo) {
assertThat(drillDownInfo.getNextMode(), is(Mode.USER));
assertThat(drillDownInfo.getInitialFilters().size(), is(1));
String client = currentRecord.get(Field.CLIENT).asString();
switch (client) {
case "CLIENT_A_FOO":
assertThat(drillDownInfo.getInitialFilters().get(0).toString(), is("CLIENT==CLIENT_A_FOO"));
break;
case "CLIENT_B_FOO":
assertThat(drillDownInfo.getInitialFilters().get(0).toString(), is("CLIENT==CLIENT_B_FOO"));
break;
case "CLIENT_A_BAR":
assertThat(drillDownInfo.getInitialFilters().get(0).toString(), is("CLIENT==CLIENT_A_BAR"));
break;
case "CLIENT_B_BAR":
assertThat(drillDownInfo.getInitialFilters().get(0).toString(), is("CLIENT==CLIENT_B_BAR"));
break;
default:
fail();
}
}
}

View File

@ -27,7 +27,8 @@ public abstract class ModeTestBase {
@Test
public void testGetRecords() {
List<Record> records = getMode().getRecords(TestUtils.createDummyClusterMetrics());
List<Record> records = getMode().getRecords(TestUtils.createDummyClusterMetrics(),
null);
assertRecords(records);
}
@ -36,7 +37,8 @@ public abstract class ModeTestBase {
@Test
public void testDrillDown() {
List<Record> records = getMode().getRecords(TestUtils.createDummyClusterMetrics());
List<Record> records = getMode().getRecords(TestUtils.createDummyClusterMetrics(),
null);
for (Record record : records) {
assertDrillDown(record, getMode().drillDown(record));
}

View File

@ -0,0 +1,70 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.hbtop.mode;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.fail;
import java.util.List;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.hbtop.Record;
import org.apache.hadoop.hbase.hbtop.TestUtils;
import org.apache.hadoop.hbase.hbtop.field.Field;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.junit.ClassRule;
import org.junit.experimental.categories.Category;
@Category(SmallTests.class)
public class UserModeTest extends ModeTestBase {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(UserModeTest.class);
@Override
protected Mode getMode() {
return Mode.USER;
}
@Override
protected void assertRecords(List<Record> records) {
TestUtils.assertRecordsInUserMode(records);
}
@Override
protected void assertDrillDown(Record currentRecord, DrillDownInfo drillDownInfo) {
assertThat(drillDownInfo.getNextMode(), is(Mode.CLIENT));
assertThat(drillDownInfo.getInitialFilters().size(), is(1));
String userName = currentRecord.get(Field.USER).asString();
switch (userName) {
case "FOO":
assertThat(drillDownInfo.getInitialFilters().get(0).toString(), is("USER==FOO"));
break;
case "BAR":
assertThat(drillDownInfo.getInitialFilters().get(0).toString(), is("USER==BAR"));
break;
default:
fail();
}
}
}

View File

@ -157,6 +157,29 @@ message RegionLoad {
optional int32 max_store_file_ref_count = 22 [default = 0];
}
message UserLoad {
/** short user name */
required string userName = 1;
/** Metrics for all clients of a user */
repeated ClientMetrics clientMetrics = 2;
}
message ClientMetrics {
/** client host name */
required string hostName = 1;
/** the current total read requests made from a client */
optional uint64 read_requests_count = 2;
/** the current total write requests made from a client */
optional uint64 write_requests_count = 3;
/** the current total filtered requests made from a client */
optional uint64 filtered_requests_count = 4;
}
/* Server-level protobufs */
message ReplicationLoadSink {
@ -230,6 +253,11 @@ message ServerLoad {
* The replicationLoadSink for the replication Sink status of this region server.
*/
optional ReplicationLoadSink replLoadSink = 11;
/**
* The metrics for each user on this region server
*/
repeated UserLoad userLoads = 12;
}
message LiveServerInfo {

View File

@ -153,6 +153,32 @@ message RegionLoad {
optional int32 max_store_file_ref_count = 22 [default = 0];
}
message UserLoad {
/** short user name */
required string userName = 1;
/** Metrics for all clients of a user */
repeated ClientMetrics clientMetrics = 2;
}
message ClientMetrics {
/** client host name */
required string hostName = 1;
/** the current total read requests made from a client */
optional uint64 read_requests_count = 2;
/** the current total write requests made from a client */
optional uint64 write_requests_count = 3;
/** the current total filtered requests made from a client */
optional uint64 filtered_requests_count = 4;
}
/* Server-level protobufs */
message ReplicationLoadSink {
@ -219,6 +245,11 @@ message ServerLoad {
* The replicationLoadSink for the replication Sink status of this region server.
*/
optional ReplicationLoadSink replLoadSink = 11;
/**
* The metrics for each user on this region server
*/
repeated UserLoad userLoads = 12;
}
message LiveServerInfo {

View File

@ -845,7 +845,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
// TODO: revisit if coprocessors should load in other cases
this.coprocessorHost = new RegionCoprocessorHost(this, rsServices, conf);
this.metricsRegionWrapper = new MetricsRegionWrapperImpl(this);
this.metricsRegion = new MetricsRegion(this.metricsRegionWrapper);
this.metricsRegion = new MetricsRegion(this.metricsRegionWrapper, conf);
} else {
this.metricsRegionWrapper = null;
this.metricsRegion = null;
@ -6624,6 +6624,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
if (!outResults.isEmpty()) {
readRequestsCount.increment();
if (metricsRegion != null) {
metricsRegion.updateReadRequestCount();
}
}
if (rsServices != null && rsServices.getMetrics() != null) {
rsServices.getMetrics().updateReadQueryMeter(getRegionInfo().getTable());
@ -6951,6 +6954,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
protected void incrementCountOfRowsFilteredMetric(ScannerContext scannerContext) {
filteredReadRequestsCount.increment();
if (metricsRegion != null) {
metricsRegion.updateFilteredRecords();
}
if (scannerContext == null || !scannerContext.isTrackingMetrics()) return;

View File

@ -56,6 +56,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Function;
import javax.management.MalformedObjectNameException;
import javax.servlet.http.HttpServlet;
import org.apache.commons.lang3.RandomUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.SystemUtils;
@ -209,6 +210,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.Coprocesso
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.RegionLoad;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.RegionStoreSequenceIds;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.UserLoad;
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.Coprocessor;
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.Coprocessor.Builder;
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.NameStringPair;
@ -1357,7 +1359,14 @@ public class HRegionServer extends HasThread implements
} else {
serverLoad.setInfoServerPort(-1);
}
MetricsUserAggregateSource userSource =
metricsRegionServer.getMetricsUserAggregate().getSource();
if (userSource != null) {
Map<String, MetricsUserSource> userMetricMap = userSource.getUserSources();
for (Entry<String, MetricsUserSource> entry : userMetricMap.entrySet()) {
serverLoad.addUserLoads(createUserLoad(entry.getKey(), entry.getValue()));
}
}
// for the replicationLoad purpose. Only need to get from one executorService
// either source or sink will get the same info
ReplicationSourceService rsources = getReplicationSourceService();
@ -1696,6 +1705,19 @@ public class HRegionServer extends HasThread implements
return regionLoadBldr.build();
}
private UserLoad createUserLoad(String user, MetricsUserSource userSource) {
UserLoad.Builder userLoadBldr = UserLoad.newBuilder();
userLoadBldr.setUserName(user);
userSource.getClientMetrics().values().stream().map(
clientMetrics -> ClusterStatusProtos.ClientMetrics.newBuilder()
.setHostName(clientMetrics.getHostName())
.setWriteRequestsCount(clientMetrics.getWriteRequestsCount())
.setFilteredRequestsCount(clientMetrics.getFilteredReadRequests())
.setReadRequestsCount(clientMetrics.getReadRequestsCount()).build())
.forEach(userLoadBldr::addClientMetrics);
return userLoadBldr.build();
}
public RegionLoad createRegionLoad(final String encodedRegionName) throws IOException {
HRegion r = onlineRegions.get(encodedRegionName);
return r != null ? createRegionLoad(r, null, null) : null;

View File

@ -18,6 +18,7 @@
*/
package org.apache.hadoop.hbase.regionserver;
import org.apache.hadoop.conf.Configuration;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
@ -29,12 +30,14 @@ import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
@InterfaceAudience.Private
public class MetricsRegion {
private final MetricsRegionSource source;
private final MetricsUserAggregate userAggregate;
private MetricsRegionWrapper regionWrapper;
public MetricsRegion(final MetricsRegionWrapper wrapper) {
public MetricsRegion(final MetricsRegionWrapper wrapper, Configuration conf) {
source = CompatibilitySingletonFactory.getInstance(MetricsRegionServerSourceFactory.class)
.createRegion(wrapper);
this.regionWrapper = wrapper;
userAggregate = MetricsUserAggregateFactory.getMetricsUserAggregate(conf);
}
public void close() {
@ -57,6 +60,9 @@ public class MetricsRegion {
source.updateScanTime(t);
}
public void updateFilteredRecords(){
userAggregate.updateFilteredReadRequests();
}
public void updateAppend() {
source.updateAppend();
}
@ -73,4 +79,7 @@ public class MetricsRegion {
return regionWrapper;
}
public void updateReadRequestCount() {
userAggregate.updateReadRequestCount();
}
}

View File

@ -101,7 +101,7 @@ public class MetricsRegionServer {
}
@VisibleForTesting
public org.apache.hadoop.hbase.regionserver.MetricsUserAggregate getMetricsUserAggregate() {
public MetricsUserAggregate getMetricsUserAggregate() {
return userAggregate;
}

View File

@ -23,6 +23,11 @@ import org.apache.yetus.audience.InterfaceAudience;
@InterfaceAudience.Private
public interface MetricsUserAggregate {
/**
* @return return a singleton instance of MetricsUserAggregateSource or null in case of NoOp
*/
MetricsUserAggregateSource getSource();
void updatePut(long t);
void updateDelete(long t);
@ -36,4 +41,8 @@ public interface MetricsUserAggregate {
void updateReplay(long t);
void updateScanTime(long t);
void updateFilteredReadRequests();
void updateReadRequestCount();
}

View File

@ -18,7 +18,6 @@
package org.apache.hadoop.hbase.regionserver;
import org.apache.hadoop.conf.Configuration;
import org.apache.yetus.audience.InterfaceAudience;
@ -27,6 +26,7 @@ public class MetricsUserAggregateFactory {
private MetricsUserAggregateFactory() {
}
public static final String METRIC_USER_ENABLED_CONF = "hbase.regionserver.user.metrics.enabled";
public static final boolean DEFAULT_METRIC_USER_ENABLED_CONF = true;
@ -36,6 +36,10 @@ public class MetricsUserAggregateFactory {
} else {
//NoOpMetricUserAggregate
return new MetricsUserAggregate() {
@Override public MetricsUserAggregateSource getSource() {
return null;
}
@Override public void updatePut(long t) {
}
@ -63,6 +67,14 @@ public class MetricsUserAggregateFactory {
@Override public void updateScanTime(long t) {
}
@Override public void updateFilteredReadRequests() {
}
@Override public void updateReadRequestCount() {
}
};
}
}

View File

@ -19,6 +19,7 @@
package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
import java.net.InetAddress;
import java.util.Optional;
import org.apache.hadoop.conf.Configuration;
@ -29,8 +30,6 @@ import org.apache.hadoop.hbase.security.UserProvider;
import org.apache.hadoop.hbase.util.LossyCounting;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
@InterfaceAudience.Private
public class MetricsUserAggregateImpl implements MetricsUserAggregate{
@ -65,8 +64,8 @@ public class MetricsUserAggregateImpl implements MetricsUserAggregate{
return user.isPresent() ? user.get().getShortName() : null;
}
@VisibleForTesting
MetricsUserAggregateSource getSource() {
@Override
public MetricsUserAggregateSource getSource() {
return source;
}
@ -74,7 +73,39 @@ public class MetricsUserAggregateImpl implements MetricsUserAggregate{
public void updatePut(long t) {
String user = getActiveUser();
if (user != null) {
getOrCreateMetricsUser(user).updatePut(t);
MetricsUserSource userSource = getOrCreateMetricsUser(user);
userSource.updatePut(t);
incrementClientWriteMetrics(userSource);
}
}
private String getClient() {
Optional<InetAddress> ipOptional = RpcServer.getRemoteAddress();
if (ipOptional.isPresent()) {
return ipOptional.get().getHostName();
}
return null;
}
private void incrementClientReadMetrics(MetricsUserSource userSource) {
String client = getClient();
if (client != null && userSource != null) {
userSource.getOrCreateMetricsClient(client).incrementReadRequest();
}
}
private void incrementFilteredReadRequests(MetricsUserSource userSource) {
String client = getClient();
if (client != null && userSource != null) {
userSource.getOrCreateMetricsClient(client).incrementFilteredReadRequests();
}
}
private void incrementClientWriteMetrics(MetricsUserSource userSource) {
String client = getClient();
if (client != null && userSource != null) {
userSource.getOrCreateMetricsClient(client).incrementWriteRequest();
}
}
@ -82,7 +113,9 @@ public class MetricsUserAggregateImpl implements MetricsUserAggregate{
public void updateDelete(long t) {
String user = getActiveUser();
if (user != null) {
getOrCreateMetricsUser(user).updateDelete(t);
MetricsUserSource userSource = getOrCreateMetricsUser(user);
userSource.updateDelete(t);
incrementClientWriteMetrics(userSource);
}
}
@ -90,7 +123,8 @@ public class MetricsUserAggregateImpl implements MetricsUserAggregate{
public void updateGet(long t) {
String user = getActiveUser();
if (user != null) {
getOrCreateMetricsUser(user).updateGet(t);
MetricsUserSource userSource = getOrCreateMetricsUser(user);
userSource.updateGet(t);
}
}
@ -98,7 +132,9 @@ public class MetricsUserAggregateImpl implements MetricsUserAggregate{
public void updateIncrement(long t) {
String user = getActiveUser();
if (user != null) {
getOrCreateMetricsUser(user).updateIncrement(t);
MetricsUserSource userSource = getOrCreateMetricsUser(user);
userSource.updateIncrement(t);
incrementClientWriteMetrics(userSource);
}
}
@ -106,7 +142,9 @@ public class MetricsUserAggregateImpl implements MetricsUserAggregate{
public void updateAppend(long t) {
String user = getActiveUser();
if (user != null) {
getOrCreateMetricsUser(user).updateAppend(t);
MetricsUserSource userSource = getOrCreateMetricsUser(user);
userSource.updateAppend(t);
incrementClientWriteMetrics(userSource);
}
}
@ -114,7 +152,9 @@ public class MetricsUserAggregateImpl implements MetricsUserAggregate{
public void updateReplay(long t) {
String user = getActiveUser();
if (user != null) {
getOrCreateMetricsUser(user).updateReplay(t);
MetricsUserSource userSource = getOrCreateMetricsUser(user);
userSource.updateReplay(t);
incrementClientWriteMetrics(userSource);
}
}
@ -122,7 +162,24 @@ public class MetricsUserAggregateImpl implements MetricsUserAggregate{
public void updateScanTime(long t) {
String user = getActiveUser();
if (user != null) {
getOrCreateMetricsUser(user).updateScanTime(t);
MetricsUserSource userSource = getOrCreateMetricsUser(user);
userSource.updateScanTime(t);
}
}
@Override public void updateFilteredReadRequests() {
String user = getActiveUser();
if (user != null) {
MetricsUserSource userSource = getOrCreateMetricsUser(user);
incrementFilteredReadRequests(userSource);
}
}
@Override public void updateReadRequestCount() {
String user = getActiveUser();
if (user != null) {
MetricsUserSource userSource = getOrCreateMetricsUser(user);
incrementClientReadMetrics(userSource);
}
}

View File

@ -18,28 +18,40 @@
package org.apache.hadoop.hbase;
import java.io.IOException;
import java.security.PrivilegedAction;
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.ClusterMetrics.Option;
import org.apache.hadoop.hbase.Waiter.Predicate;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.AsyncAdmin;
import org.apache.hadoop.hbase.client.AsyncConnection;
import org.apache.hadoop.hbase.client.ClusterConnectionFactory;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
import org.apache.hadoop.hbase.client.RegionStatesCount;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
import org.apache.hadoop.hbase.coprocessor.MasterCoprocessor;
import org.apache.hadoop.hbase.coprocessor.MasterCoprocessorEnvironment;
import org.apache.hadoop.hbase.coprocessor.MasterObserver;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.filter.FilterAllFilter;
import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.security.UserProvider;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.JVMClusterUtil.MasterThread;
@ -267,8 +279,7 @@ public class TestClientClusterMetrics {
UTIL.deleteTable(TABLE_NAME);
}
@Test
public void testMasterAndBackupMastersStatus() throws Exception {
@Test public void testMasterAndBackupMastersStatus() throws Exception {
// get all the master threads
List<MasterThread> masterThreads = CLUSTER.getMasterThreads();
int numActive = 0;
@ -293,6 +304,134 @@ public class TestClientClusterMetrics {
Assert.assertEquals(MASTERS - 1, metrics.getBackupMasterNames().size());
}
@Test public void testUserMetrics() throws Exception {
Configuration conf = UTIL.getConfiguration();
User userFoo = User.createUserForTesting(conf, "FOO_USER_METRIC_TEST", new String[0]);
User userBar = User.createUserForTesting(conf, "BAR_USER_METRIC_TEST", new String[0]);
User userTest = User.createUserForTesting(conf, "TEST_USER_METRIC_TEST", new String[0]);
UTIL.createTable(TABLE_NAME, CF);
waitForUsersMetrics(0);
long writeMetaMetricBeforeNextuser = getMetaMetrics().getWriteRequestCount();
userFoo.runAs(new PrivilegedAction<Void>() {
@Override public Void run() {
try {
doPut();
} catch (IOException e) {
Assert.fail("Exception:" + e.getMessage());
}
return null;
}
});
waitForUsersMetrics(1);
long writeMetaMetricForUserFoo =
getMetaMetrics().getWriteRequestCount() - writeMetaMetricBeforeNextuser;
long readMetaMetricBeforeNextuser = getMetaMetrics().getReadRequestCount();
userBar.runAs(new PrivilegedAction<Void>() {
@Override public Void run() {
try {
doGet();
} catch (IOException e) {
Assert.fail("Exception:" + e.getMessage());
}
return null;
}
});
waitForUsersMetrics(2);
long readMetaMetricForUserBar =
getMetaMetrics().getReadRequestCount() - readMetaMetricBeforeNextuser;
long filteredMetaReqeust = getMetaMetrics().getFilteredReadRequestCount();
userTest.runAs(new PrivilegedAction<Void>() {
@Override public Void run() {
try {
Table table = createConnection(UTIL.getConfiguration()).getTable(TABLE_NAME);
for (Result result : table.getScanner(new Scan().setFilter(new FilterAllFilter()))) {
Assert.fail("Should have filtered all rows");
}
} catch (IOException e) {
Assert.fail("Exception:" + e.getMessage());
}
return null;
}
});
waitForUsersMetrics(3);
long filteredMetaReqeustForTestUser =
getMetaMetrics().getFilteredReadRequestCount() - filteredMetaReqeust;
Map<byte[], UserMetrics> userMap =
ADMIN.getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS)).getLiveServerMetrics().values()
.iterator().next().getUserMetrics();
for (byte[] user : userMap.keySet()) {
switch (Bytes.toString(user)) {
case "FOO_USER_METRIC_TEST":
Assert.assertEquals(1,
userMap.get(user).getWriteRequestCount() - writeMetaMetricForUserFoo);
break;
case "BAR_USER_METRIC_TEST":
Assert
.assertEquals(1, userMap.get(user).getReadRequestCount() - readMetaMetricForUserBar);
Assert.assertEquals(0, userMap.get(user).getWriteRequestCount());
break;
case "TEST_USER_METRIC_TEST":
Assert.assertEquals(1,
userMap.get(user).getFilteredReadRequests() - filteredMetaReqeustForTestUser);
Assert.assertEquals(0, userMap.get(user).getWriteRequestCount());
break;
default:
//current user
Assert.assertEquals(UserProvider.instantiate(conf).getCurrent().getName(),
Bytes.toString(user));
//Read/write count because of Meta operations
Assert.assertTrue(userMap.get(user).getReadRequestCount() > 1);
break;
}
}
UTIL.deleteTable(TABLE_NAME);
}
private RegionMetrics getMetaMetrics() throws IOException {
for (ServerMetrics serverMetrics : ADMIN.getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS))
.getLiveServerMetrics().values()) {
RegionMetrics metaMetrics = serverMetrics.getRegionMetrics()
.get(RegionInfoBuilder.FIRST_META_REGIONINFO.getRegionName());
if (metaMetrics != null) {
return metaMetrics;
}
}
Assert.fail("Should have find meta metrics");
return null;
}
private void waitForUsersMetrics(int noOfUsers) throws Exception {
//Sleep for metrics to get updated on master
Thread.sleep(5000);
Waiter.waitFor(CLUSTER.getConfiguration(), 10 * 1000, 100, new Predicate<Exception>() {
@Override public boolean evaluate() throws Exception {
Map<byte[], UserMetrics> metrics =
ADMIN.getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS)).getLiveServerMetrics().values()
.iterator().next().getUserMetrics();
Assert.assertNotNull(metrics);
//including current user + noOfUsers
return metrics.keySet().size() > noOfUsers;
}
});
}
private void doPut() throws IOException {
Table table = createConnection(UTIL.getConfiguration()).getTable(TABLE_NAME);
table.put(new Put(Bytes.toBytes("a")).addColumn(CF, Bytes.toBytes("col1"), Bytes.toBytes("1")));
}
private void doGet() throws IOException {
Table table = createConnection(UTIL.getConfiguration()).getTable(TABLE_NAME);
table.get(new Get(Bytes.toBytes("a")).addColumn(CF, Bytes.toBytes("col1")));
}
private Connection createConnection(Configuration conf) throws IOException {
User user = UserProvider.instantiate(conf).getCurrent();
return ClusterConnectionFactory.createAsyncClusterConnection(conf, null, user).toConnection();
}
@Test
public void testOtherStatusInfos() throws Exception {
EnumSet<Option> options =

View File

@ -36,6 +36,7 @@ import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.Size;
import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.UserMetrics;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.RegionStatesCount;
import org.apache.hadoop.hbase.master.assignment.AssignmentManager;
@ -356,6 +357,10 @@ public class TestRegionsRecoveryChore {
return regionMetricsMap;
}
@Override public Map<byte[], UserMetrics> getUserMetrics() {
return new HashMap<>();
}
@Override
public Set<String> getCoprocessorNames() {
return null;

View File

@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hbase.regionserver;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CompatibilityFactory;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.test.MetricsAssertHelper;
@ -38,7 +39,7 @@ public class TestMetricsRegion {
@Test
public void testRegionWrapperMetrics() {
MetricsRegion mr = new MetricsRegion(new MetricsRegionWrapperStub());
MetricsRegion mr = new MetricsRegion(new MetricsRegionWrapperStub(), new Configuration());
MetricsRegionAggregateSource agg = mr.getSource().getAggregateSource();
HELPER.assertGauge(
@ -75,7 +76,7 @@ public class TestMetricsRegion {
mr.close();
// test region with replica id > 0
mr = new MetricsRegion(new MetricsRegionWrapperStub(1));
mr = new MetricsRegion(new MetricsRegionWrapperStub(1), new Configuration());
agg = mr.getSource().getAggregateSource();
HELPER.assertGauge(
"namespace_TestNS_table_MetricsRegionWrapperStub_region_DEADBEEF001_metric_storeCount",