diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ServerLoad.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ServerLoad.java index 15c8e63ae95..b22d6c4e244 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ServerLoad.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ServerLoad.java @@ -412,6 +412,10 @@ public class ServerLoad implements ServerMetrics { return metrics.getRegionMetrics(); } + @Override public Map getUserMetrics() { + return metrics.getUserMetrics(); + } + @Override public Set getCoprocessorNames() { return metrics.getCoprocessorNames(); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ServerMetrics.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ServerMetrics.java index 391e62ff390..21fad92aa25 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ServerMetrics.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ServerMetrics.java @@ -93,6 +93,11 @@ public interface ServerMetrics { */ Map getRegionMetrics(); + /** + * @return metrics per user + */ + Map getUserMetrics(); + /** * Return the RegionServer-level and Region-level coprocessors * @return string set of loaded RegionServer-level and Region-level coprocessors diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ServerMetricsBuilder.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ServerMetricsBuilder.java index a9fa71f24e8..e5cd7b2fd64 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ServerMetricsBuilder.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ServerMetricsBuilder.java @@ -77,6 +77,8 @@ public final class ServerMetricsBuilder { .map(HBaseProtos.Coprocessor::getName).collect(Collectors.toList())) .setRegionMetrics(serverLoadPB.getRegionLoadsList().stream() .map(RegionMetricsBuilder::toRegionMetrics).collect(Collectors.toList())) + .setUserMetrics(serverLoadPB.getUserLoadsList().stream() + .map(UserMetricsBuilder::toUserMetrics).collect(Collectors.toList())) .setReplicationLoadSources(serverLoadPB.getReplLoadSourceList().stream() .map(ProtobufUtil::toReplicationLoadSource).collect(Collectors.toList())) .setReplicationLoadSink(serverLoadPB.hasReplLoadSink() @@ -100,19 +102,19 @@ public final class ServerMetricsBuilder { .setInfoServerPort(metrics.getInfoServerPort()) .setMaxHeapMB((int) metrics.getMaxHeapSize().get(Size.Unit.MEGABYTE)) .setUsedHeapMB((int) metrics.getUsedHeapSize().get(Size.Unit.MEGABYTE)) - .addAllCoprocessors(toCoprocessor(metrics.getCoprocessorNames())) - .addAllRegionLoads(metrics.getRegionMetrics().values().stream() - .map(RegionMetricsBuilder::toRegionLoad) - .collect(Collectors.toList())) - .addAllReplLoadSource(metrics.getReplicationLoadSourceList().stream() - .map(ProtobufUtil::toReplicationLoadSource) - .collect(Collectors.toList())) + .addAllCoprocessors(toCoprocessor(metrics.getCoprocessorNames())).addAllRegionLoads( + metrics.getRegionMetrics().values().stream().map(RegionMetricsBuilder::toRegionLoad) + .collect(Collectors.toList())).addAllUserLoads( + metrics.getUserMetrics().values().stream().map(UserMetricsBuilder::toUserMetrics) + .collect(Collectors.toList())).addAllReplLoadSource( + metrics.getReplicationLoadSourceList().stream() + .map(ProtobufUtil::toReplicationLoadSource).collect(Collectors.toList())) .setReportStartTime(metrics.getLastReportTimestamp()) .setReportEndTime(metrics.getReportTimestamp()); if (metrics.getReplicationLoadSink() != null) { - builder.setReplLoadSink(ProtobufUtil.toReplicationLoadSink( - metrics.getReplicationLoadSink())); + builder.setReplLoadSink(ProtobufUtil.toReplicationLoadSink(metrics.getReplicationLoadSink())); } + return builder.build(); } @@ -132,6 +134,7 @@ public final class ServerMetricsBuilder { @Nullable private ReplicationLoadSink sink = null; private final Map regionStatus = new TreeMap<>(Bytes.BYTES_COMPARATOR); + private final Map userMetrics = new TreeMap<>(Bytes.BYTES_COMPARATOR); private final Set coprocessorNames = new TreeSet<>(); private long reportTimestamp = System.currentTimeMillis(); private long lastReportTimestamp = 0; @@ -189,6 +192,11 @@ public final class ServerMetricsBuilder { return this; } + public ServerMetricsBuilder setUserMetrics(List value) { + value.forEach(v -> this.userMetrics.put(v.getUserName(), v)); + return this; + } + public ServerMetricsBuilder setCoprocessorNames(List value) { coprocessorNames.addAll(value); return this; @@ -219,7 +227,8 @@ public final class ServerMetricsBuilder { regionStatus, coprocessorNames, reportTimestamp, - lastReportTimestamp); + lastReportTimestamp, + userMetrics); } private static class ServerMetricsImpl implements ServerMetrics { @@ -238,12 +247,13 @@ public final class ServerMetricsBuilder { private final Set coprocessorNames; private final long reportTimestamp; private final long lastReportTimestamp; + private final Map userMetrics; ServerMetricsImpl(ServerName serverName, int versionNumber, String version, long requestCountPerSecond, long requestCount, Size usedHeapSize, Size maxHeapSize, int infoServerPort, List sources, ReplicationLoadSink sink, Map regionStatus, Set coprocessorNames, long reportTimestamp, - long lastReportTimestamp) { + long lastReportTimestamp, Map userMetrics) { this.serverName = Preconditions.checkNotNull(serverName); this.versionNumber = versionNumber; this.version = version; @@ -255,6 +265,7 @@ public final class ServerMetricsBuilder { this.sources = Preconditions.checkNotNull(sources); this.sink = sink; this.regionStatus = Preconditions.checkNotNull(regionStatus); + this.userMetrics = Preconditions.checkNotNull(userMetrics); this.coprocessorNames =Preconditions.checkNotNull(coprocessorNames); this.reportTimestamp = reportTimestamp; this.lastReportTimestamp = lastReportTimestamp; @@ -324,6 +335,11 @@ public final class ServerMetricsBuilder { return Collections.unmodifiableMap(regionStatus); } + @Override + public Map getUserMetrics() { + return Collections.unmodifiableMap(userMetrics); + } + @Override public Set getCoprocessorNames() { return Collections.unmodifiableSet(coprocessorNames); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/UserMetrics.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/UserMetrics.java new file mode 100644 index 00000000000..6c2ba07cc3d --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/UserMetrics.java @@ -0,0 +1,86 @@ +/** + * Copyright The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase; + +import java.util.Map; + +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.yetus.audience.InterfaceAudience; +import org.apache.yetus.audience.InterfaceStability; + +/** + * Encapsulates per-user load metrics. + */ +@InterfaceAudience.Public +@InterfaceStability.Evolving +public interface UserMetrics { + + interface ClientMetrics { + + String getHostName(); + + long getReadRequestsCount(); + + long getWriteRequestsCount(); + + long getFilteredReadRequestsCount(); + } + + /** + * @return the user name + */ + byte[] getUserName(); + + /** + * @return the number of read requests made by user + */ + long getReadRequestCount(); + + /** + * @return the number of write requests made by user + */ + long getWriteRequestCount(); + + /** + * @return the number of write requests and read requests and coprocessor + * service requests made by the user + */ + default long getRequestCount() { + return getReadRequestCount() + getWriteRequestCount(); + } + + /** + * @return the user name as a string + */ + default String getNameAsString() { + return Bytes.toStringBinary(getUserName()); + } + + /** + * @return metrics per client(hostname) + */ + Map getClientMetrics(); + + /** + * @return count of filtered read requests for a user + */ + long getFilteredReadRequests(); +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/UserMetricsBuilder.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/UserMetricsBuilder.java new file mode 100644 index 00000000000..70d28883c26 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/UserMetricsBuilder.java @@ -0,0 +1,151 @@ +/** + * Copyright The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.hadoop.hbase.util.Strings; +import org.apache.yetus.audience.InterfaceAudience; + +import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; + +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos; + +@InterfaceAudience.Private +public final class UserMetricsBuilder { + + public static UserMetrics toUserMetrics(ClusterStatusProtos.UserLoad userLoad) { + UserMetricsBuilder builder = UserMetricsBuilder.newBuilder(userLoad.getUserName().getBytes()); + userLoad.getClientMetricsList().stream().map( + clientMetrics -> new ClientMetricsImpl(clientMetrics.getHostName(), + clientMetrics.getReadRequestsCount(), clientMetrics.getWriteRequestsCount(), + clientMetrics.getFilteredRequestsCount())).forEach(builder::addClientMetris); + return builder.build(); + } + + public static ClusterStatusProtos.UserLoad toUserMetrics(UserMetrics userMetrics) { + ClusterStatusProtos.UserLoad.Builder builder = + ClusterStatusProtos.UserLoad.newBuilder().setUserName(userMetrics.getNameAsString()); + userMetrics.getClientMetrics().values().stream().map( + clientMetrics -> ClusterStatusProtos.ClientMetrics.newBuilder() + .setHostName(clientMetrics.getHostName()) + .setWriteRequestsCount(clientMetrics.getWriteRequestsCount()) + .setReadRequestsCount(clientMetrics.getReadRequestsCount()) + .setFilteredRequestsCount(clientMetrics.getFilteredReadRequestsCount()).build()) + .forEach(builder::addClientMetrics); + return builder.build(); + } + + public static UserMetricsBuilder newBuilder(byte[] name) { + return new UserMetricsBuilder(name); + } + + + private final byte[] name; + private Map clientMetricsMap = new HashMap<>(); + private UserMetricsBuilder(byte[] name) { + this.name = name; + } + + public UserMetricsBuilder addClientMetris(UserMetrics.ClientMetrics clientMetrics) { + clientMetricsMap.put(clientMetrics.getHostName(), clientMetrics); + return this; + } + + public UserMetrics build() { + return new UserMetricsImpl(name, clientMetricsMap); + } + + public static class ClientMetricsImpl implements UserMetrics.ClientMetrics { + private final long filteredReadRequestsCount; + private final String hostName; + private final long readRequestCount; + private final long writeRequestCount; + + public ClientMetricsImpl(String hostName, long readRequest, long writeRequest, + long filteredReadRequestsCount) { + this.hostName = hostName; + this.readRequestCount = readRequest; + this.writeRequestCount = writeRequest; + this.filteredReadRequestsCount = filteredReadRequestsCount; + } + + @Override public String getHostName() { + return hostName; + } + + @Override public long getReadRequestsCount() { + return readRequestCount; + } + + @Override public long getWriteRequestsCount() { + return writeRequestCount; + } + + @Override public long getFilteredReadRequestsCount() { + return filteredReadRequestsCount; + } + } + + private static class UserMetricsImpl implements UserMetrics { + private final byte[] name; + private final Map clientMetricsMap; + + UserMetricsImpl(byte[] name, Map clientMetricsMap) { + this.name = Preconditions.checkNotNull(name); + this.clientMetricsMap = clientMetricsMap; + } + + @Override public byte[] getUserName() { + return name; + } + + @Override public long getReadRequestCount() { + return clientMetricsMap.values().stream().map(c -> c.getReadRequestsCount()) + .reduce(0L, Long::sum); + } + + @Override public long getWriteRequestCount() { + return clientMetricsMap.values().stream().map(c -> c.getWriteRequestsCount()) + .reduce(0L, Long::sum); + } + + @Override public Map getClientMetrics() { + return this.clientMetricsMap; + } + + @Override public long getFilteredReadRequests() { + return clientMetricsMap.values().stream().map(c -> c.getFilteredReadRequestsCount()) + .reduce(0L, Long::sum); + } + + @Override + public String toString() { + StringBuilder sb = Strings + .appendKeyValue(new StringBuilder(), "readRequestCount", this.getReadRequestCount()); + Strings.appendKeyValue(sb, "writeRequestCount", this.getWriteRequestCount()); + Strings.appendKeyValue(sb, "filteredReadRequestCount", this.getFilteredReadRequests()); + return sb.toString(); + } + } + +} diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsUserAggregateSource.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsUserAggregateSource.java index 0ffb928f189..ee570f00d99 100644 --- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsUserAggregateSource.java +++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsUserAggregateSource.java @@ -18,6 +18,8 @@ package org.apache.hadoop.hbase.regionserver; +import java.util.Map; + import org.apache.hadoop.hbase.metrics.BaseSource; import org.apache.yetus.audience.InterfaceAudience; @@ -59,4 +61,6 @@ public interface MetricsUserAggregateSource extends BaseSource { MetricsUserSource getOrCreateMetricsUser(String user); void deregister(MetricsUserSource toRemove); + + Map getUserSources(); } diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsUserSource.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsUserSource.java index b20dca6b1b5..96173669bbc 100644 --- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsUserSource.java +++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsUserSource.java @@ -18,11 +18,31 @@ package org.apache.hadoop.hbase.regionserver; +import java.util.Map; + +import org.apache.hadoop.metrics2.MetricsCollector; import org.apache.yetus.audience.InterfaceAudience; @InterfaceAudience.Private public interface MetricsUserSource extends Comparable { + //These client metrics will be reported through clusterStatus and hbtop only + interface ClientMetrics { + void incrementReadRequest(); + + void incrementWriteRequest(); + + String getHostName(); + + long getReadRequestsCount(); + + long getWriteRequestsCount(); + + void incrementFilteredReadRequests(); + + long getFilteredReadRequests(); + } + String getUser(); void register(); @@ -42,4 +62,21 @@ public interface MetricsUserSource extends Comparable { void updateReplay(long t); void updateScanTime(long t); + + void getMetrics(MetricsCollector metricsCollector, boolean all); + + /** + * Metrics collected at client level for a user(needed for reporting through clusterStatus + * and hbtop currently) + * @return metrics per hostname + */ + Map getClientMetrics(); + + /** + * Create a instance of ClientMetrics if not present otherwise return the previous one + * + * @param hostName hostname of the client + * @return Instance of ClientMetrics + */ + ClientMetrics getOrCreateMetricsClient(String hostName); } diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/metrics2/MetricHistogram.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/metrics2/MetricHistogram.java index 835c50ba769..bc1e8cb242f 100644 --- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/metrics2/MetricHistogram.java +++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/metrics2/MetricHistogram.java @@ -47,4 +47,10 @@ public interface MetricHistogram { */ void add(long value); + /** + * Return the total number of values added to the histogram. + * @return the total number of values. + */ + long getCount(); + } diff --git a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsUserAggregateSourceImpl.java b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsUserAggregateSourceImpl.java index c447f40f19e..28726c4ee1f 100644 --- a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsUserAggregateSourceImpl.java +++ b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsUserAggregateSourceImpl.java @@ -18,6 +18,8 @@ package org.apache.hadoop.hbase.regionserver; +import java.util.Collections; +import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import org.apache.hadoop.hbase.metrics.BaseSourceImpl; @@ -28,8 +30,6 @@ import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; - @InterfaceAudience.Private public class MetricsUserAggregateSourceImpl extends BaseSourceImpl implements MetricsUserAggregateSource { @@ -90,9 +90,9 @@ public class MetricsUserAggregateSourceImpl extends BaseSourceImpl } } - @VisibleForTesting - public ConcurrentHashMap getUserSources() { - return userSources; + @Override + public Map getUserSources() { + return Collections.unmodifiableMap(userSources); } @Override diff --git a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsUserSourceImpl.java b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsUserSourceImpl.java index 9f714a359e4..ef0eb7bf462 100644 --- a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsUserSourceImpl.java +++ b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsUserSourceImpl.java @@ -18,9 +18,14 @@ package org.apache.hadoop.hbase.regionserver; +import java.util.Collections; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.LongAdder; import org.apache.hadoop.metrics2.MetricHistogram; +import org.apache.hadoop.metrics2.MetricsCollector; import org.apache.hadoop.metrics2.MetricsRecordBuilder; import org.apache.hadoop.metrics2.lib.DynamicMetricsRegistry; import org.apache.yetus.audience.InterfaceAudience; @@ -57,6 +62,48 @@ public class MetricsUserSourceImpl implements MetricsUserSource { private final MetricsUserAggregateSourceImpl agg; private final DynamicMetricsRegistry registry; + private ConcurrentHashMap clientMetricsMap; + + static class ClientMetricsImpl implements ClientMetrics { + private final String hostName; + final LongAdder readRequestsCount = new LongAdder(); + final LongAdder writeRequestsCount = new LongAdder(); + final LongAdder filteredRequestsCount = new LongAdder(); + + public ClientMetricsImpl(String hostName) { + this.hostName = hostName; + } + + @Override public void incrementReadRequest() { + readRequestsCount.increment(); + } + + @Override public void incrementWriteRequest() { + writeRequestsCount.increment(); + } + + @Override public String getHostName() { + return hostName; + } + + @Override public long getReadRequestsCount() { + return readRequestsCount.sum(); + } + + @Override public long getWriteRequestsCount() { + return writeRequestsCount.sum(); + } + + @Override public void incrementFilteredReadRequests() { + filteredRequestsCount.increment(); + + } + + @Override public long getFilteredReadRequests() { + return filteredRequestsCount.sum(); + } + } + public MetricsUserSourceImpl(String user, MetricsUserAggregateSourceImpl agg) { if (LOG.isDebugEnabled()) { LOG.debug("Creating new MetricsUserSourceImpl for user " + user); @@ -77,7 +124,7 @@ public class MetricsUserSourceImpl implements MetricsUserSource { userIncrementKey = userNamePrefix + MetricsRegionServerSource.INCREMENT_KEY; userAppendKey = userNamePrefix + MetricsRegionServerSource.APPEND_KEY; userReplayKey = userNamePrefix + MetricsRegionServerSource.REPLAY_KEY; - + clientMetricsMap = new ConcurrentHashMap<>(); agg.register(this); } @@ -204,4 +251,27 @@ public class MetricsUserSourceImpl implements MetricsUserSource { public void updateScanTime(long t) { scanTimeHisto.add(t); } + + @Override public void getMetrics(MetricsCollector metricsCollector, boolean all) { + MetricsRecordBuilder mrb = metricsCollector.addRecord(this.userNamePrefix); + registry.snapshot(mrb, all); + } + + @Override public Map getClientMetrics() { + return Collections.unmodifiableMap(clientMetricsMap); + } + + @Override public ClientMetrics getOrCreateMetricsClient(String client) { + ClientMetrics source = clientMetricsMap.get(client); + if (source != null) { + return source; + } + source = new ClientMetricsImpl(client); + ClientMetrics prev = clientMetricsMap.putIfAbsent(client, source); + if (prev != null) { + return prev; + } + return source; + } + } diff --git a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/metrics2/lib/MutableHistogram.java b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/metrics2/lib/MutableHistogram.java index 75dc300ba44..dc86ebe8bf7 100644 --- a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/metrics2/lib/MutableHistogram.java +++ b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/metrics2/lib/MutableHistogram.java @@ -51,6 +51,10 @@ public class MutableHistogram extends MutableMetric implements MetricHistogram { histogram.update(val); } + @Override public long getCount() { + return histogram.getCount(); + } + public long getMax() { return histogram.getMax(); } diff --git a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/metrics2/lib/MutableRangeHistogram.java b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/metrics2/lib/MutableRangeHistogram.java index 273154f9ff2..4a406cc4679 100644 --- a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/metrics2/lib/MutableRangeHistogram.java +++ b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/metrics2/lib/MutableRangeHistogram.java @@ -83,5 +83,9 @@ public abstract class MutableRangeHistogram extends MutableHistogram implements Interns.info(name + "_" + rangeType + "_" + ranges[ranges.length - 1] + "-inf", desc), val - cumNum); } - } + } + + @Override public long getCount() { + return histogram.getCount(); + } } diff --git a/hbase-hbtop/src/main/java/org/apache/hadoop/hbase/hbtop/RecordFilter.java b/hbase-hbtop/src/main/java/org/apache/hadoop/hbase/hbtop/RecordFilter.java index aaef965c4e9..c7093ddd9d4 100644 --- a/hbase-hbtop/src/main/java/org/apache/hadoop/hbase/hbtop/RecordFilter.java +++ b/hbase-hbtop/src/main/java/org/apache/hadoop/hbase/hbtop/RecordFilter.java @@ -143,6 +143,10 @@ public final class RecordFilter { this.value = Objects.requireNonNull(value); } + public Field getField() { + return field; + } + public boolean execute(Record record) { FieldValue fieldValue = record.get(field); if (fieldValue == null) { diff --git a/hbase-hbtop/src/main/java/org/apache/hadoop/hbase/hbtop/field/Field.java b/hbase-hbtop/src/main/java/org/apache/hadoop/hbase/hbtop/field/Field.java index 6e5f66f6244..df460dd31cf 100644 --- a/hbase-hbtop/src/main/java/org/apache/hadoop/hbase/hbtop/field/Field.java +++ b/hbase-hbtop/src/main/java/org/apache/hadoop/hbase/hbtop/field/Field.java @@ -57,7 +57,11 @@ public enum Field { FieldValueType.STRING), REGION_COUNT("#REGION", "Region Count", false, false, FieldValueType.INTEGER), USED_HEAP_SIZE("UHEAP", "Used Heap Size", false, false, FieldValueType.SIZE), - MAX_HEAP_SIZE("MHEAP", "Max Heap Size", false, false, FieldValueType.SIZE); + USER("USER", "user Name", true, true, FieldValueType.STRING), + MAX_HEAP_SIZE("MHEAP", "Max Heap Size", false, false, FieldValueType.SIZE), + CLIENT_COUNT("#CLIENT", "Client Count", false, false, FieldValueType.INTEGER), + USER_COUNT("#USER", "User Count", false, false, FieldValueType.INTEGER), + CLIENT("CLIENT", "Client Hostname", true, true, FieldValueType.STRING); private final String header; private final String description; diff --git a/hbase-hbtop/src/main/java/org/apache/hadoop/hbase/hbtop/mode/ClientModeStrategy.java b/hbase-hbtop/src/main/java/org/apache/hadoop/hbase/hbtop/mode/ClientModeStrategy.java new file mode 100644 index 00000000000..fe3edd1b254 --- /dev/null +++ b/hbase-hbtop/src/main/java/org/apache/hadoop/hbase/hbtop/mode/ClientModeStrategy.java @@ -0,0 +1,157 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.hbtop.mode; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +import org.apache.hadoop.hbase.ClusterMetrics; +import org.apache.hadoop.hbase.ServerMetrics; +import org.apache.hadoop.hbase.UserMetrics; +import org.apache.hadoop.hbase.hbtop.Record; +import org.apache.hadoop.hbase.hbtop.RecordFilter; +import org.apache.hadoop.hbase.hbtop.field.Field; +import org.apache.hadoop.hbase.hbtop.field.FieldInfo; +import org.apache.hadoop.hbase.hbtop.field.FieldValue; +import org.apache.hadoop.hbase.hbtop.field.FieldValueType; +import org.apache.yetus.audience.InterfaceAudience; + +/** + * Implementation for {@link ModeStrategy} for client Mode. + */ +@InterfaceAudience.Private public final class ClientModeStrategy implements ModeStrategy { + + private final List fieldInfos = Arrays + .asList(new FieldInfo(Field.CLIENT, 0, true), + new FieldInfo(Field.USER_COUNT, 5, true), + new FieldInfo(Field.REQUEST_COUNT_PER_SECOND, 10, true), + new FieldInfo(Field.READ_REQUEST_COUNT_PER_SECOND, 10, true), + new FieldInfo(Field.WRITE_REQUEST_COUNT_PER_SECOND, 10, true), + new FieldInfo(Field.FILTERED_READ_REQUEST_COUNT_PER_SECOND, 10, true)); + private final Map requestCountPerSecondMap = new HashMap<>(); + + ClientModeStrategy() { + } + + @Override public List getFieldInfos() { + return fieldInfos; + } + + @Override public Field getDefaultSortField() { + return Field.REQUEST_COUNT_PER_SECOND; + } + + @Override public List getRecords(ClusterMetrics clusterMetrics, + List pushDownFilters) { + List records = createRecords(clusterMetrics); + return aggregateRecordsAndAddDistinct( + ModeStrategyUtils.applyFilterAndGet(records, pushDownFilters), Field.CLIENT, Field.USER, + Field.USER_COUNT); + } + + List createRecords(ClusterMetrics clusterMetrics) { + List ret = new ArrayList<>(); + for (ServerMetrics serverMetrics : clusterMetrics.getLiveServerMetrics().values()) { + long lastReportTimestamp = serverMetrics.getLastReportTimestamp(); + serverMetrics.getUserMetrics().values().forEach(um -> um.getClientMetrics().values().forEach( + clientMetrics -> ret.add( + createRecord(um.getNameAsString(), clientMetrics, lastReportTimestamp, + serverMetrics.getServerName().getServerName())))); + } + return ret; + } + + /** + * Aggregate the records and count the unique values for the given distinctField + * + * @param records records to be processed + * @param groupBy Field on which group by needs to be done + * @param distinctField Field whose unique values needs to be counted + * @param uniqueCountAssignedTo a target field to which the unique count is assigned to + * @return aggregated records + */ + List aggregateRecordsAndAddDistinct(List records, Field groupBy, + Field distinctField, Field uniqueCountAssignedTo) { + List result = new ArrayList<>(); + records.stream().collect(Collectors.groupingBy(r -> r.get(groupBy))).values() + .forEach(val -> { + Set distinctValues = new HashSet<>(); + Map map = new HashMap<>(); + for (Record record : val) { + for (Map.Entry field : record.entrySet()) { + if (distinctField.equals(field.getKey())) { + //We will not be adding the field in the new record whose distinct count is required + distinctValues.add(record.get(distinctField)); + } else { + if (field.getKey().getFieldValueType() == FieldValueType.STRING) { + map.put(field.getKey(), field.getValue()); + } else { + if (map.get(field.getKey()) == null) { + map.put(field.getKey(), field.getValue()); + } else { + map.put(field.getKey(), map.get(field.getKey()).plus(field.getValue())); + } + } + } + } + } + // Add unique count field + map.put(uniqueCountAssignedTo, uniqueCountAssignedTo.newValue(distinctValues.size())); + result.add(Record.ofEntries(map.entrySet().stream() + .map(k -> Record.entry(k.getKey(), k.getValue())))); + }); + return result; + } + + Record createRecord(String user, UserMetrics.ClientMetrics clientMetrics, + long lastReportTimestamp, String server) { + Record.Builder builder = Record.builder(); + String client = clientMetrics.getHostName(); + builder.put(Field.CLIENT, clientMetrics.getHostName()); + String mapKey = client + "$" + user + "$" + server; + RequestCountPerSecond requestCountPerSecond = requestCountPerSecondMap.get(mapKey); + if (requestCountPerSecond == null) { + requestCountPerSecond = new RequestCountPerSecond(); + requestCountPerSecondMap.put(mapKey, requestCountPerSecond); + } + requestCountPerSecond.refresh(lastReportTimestamp, clientMetrics.getReadRequestsCount(), + clientMetrics.getFilteredReadRequestsCount(), clientMetrics.getWriteRequestsCount()); + builder.put(Field.REQUEST_COUNT_PER_SECOND, requestCountPerSecond.getRequestCountPerSecond()); + builder.put(Field.READ_REQUEST_COUNT_PER_SECOND, + requestCountPerSecond.getReadRequestCountPerSecond()); + builder.put(Field.WRITE_REQUEST_COUNT_PER_SECOND, + requestCountPerSecond.getWriteRequestCountPerSecond()); + builder.put(Field.FILTERED_READ_REQUEST_COUNT_PER_SECOND, + requestCountPerSecond.getFilteredReadRequestCountPerSecond()); + builder.put(Field.USER, user); + return builder.build(); + } + + @Override public DrillDownInfo drillDown(Record selectedRecord) { + List initialFilters = Collections.singletonList( + RecordFilter.newBuilder(Field.CLIENT).doubleEquals(selectedRecord.get(Field.CLIENT))); + return new DrillDownInfo(Mode.USER, initialFilters); + } +} diff --git a/hbase-hbtop/src/main/java/org/apache/hadoop/hbase/hbtop/mode/Mode.java b/hbase-hbtop/src/main/java/org/apache/hadoop/hbase/hbtop/mode/Mode.java index 1290e6916cb..ffd98dfd683 100644 --- a/hbase-hbtop/src/main/java/org/apache/hadoop/hbase/hbtop/mode/Mode.java +++ b/hbase-hbtop/src/main/java/org/apache/hadoop/hbase/hbtop/mode/Mode.java @@ -22,6 +22,7 @@ import java.util.List; import java.util.Objects; import org.apache.hadoop.hbase.ClusterMetrics; import org.apache.hadoop.hbase.hbtop.Record; +import org.apache.hadoop.hbase.hbtop.RecordFilter; import org.apache.hadoop.hbase.hbtop.field.Field; import org.apache.hadoop.hbase.hbtop.field.FieldInfo; import org.apache.yetus.audience.InterfaceAudience; @@ -35,7 +36,9 @@ public enum Mode { NAMESPACE("Namespace", "Record per Namespace", new NamespaceModeStrategy()), TABLE("Table", "Record per Table", new TableModeStrategy()), REGION("Region", "Record per Region", new RegionModeStrategy()), - REGION_SERVER("RegionServer", "Record per RegionServer", new RegionServerModeStrategy()); + REGION_SERVER("RegionServer", "Record per RegionServer", new RegionServerModeStrategy()), + USER("User", "Record per user", new UserModeStrategy()), + CLIENT("Client", "Record per client", new ClientModeStrategy()); private final String header; private final String description; @@ -55,8 +58,9 @@ public enum Mode { return description; } - public List getRecords(ClusterMetrics clusterMetrics) { - return modeStrategy.getRecords(clusterMetrics); + public List getRecords(ClusterMetrics clusterMetrics, + List pushDownFilters) { + return modeStrategy.getRecords(clusterMetrics, pushDownFilters); } public List getFieldInfos() { diff --git a/hbase-hbtop/src/main/java/org/apache/hadoop/hbase/hbtop/mode/ModeStrategy.java b/hbase-hbtop/src/main/java/org/apache/hadoop/hbase/hbtop/mode/ModeStrategy.java index 09fa297e303..021cee25810 100644 --- a/hbase-hbtop/src/main/java/org/apache/hadoop/hbase/hbtop/mode/ModeStrategy.java +++ b/hbase-hbtop/src/main/java/org/apache/hadoop/hbase/hbtop/mode/ModeStrategy.java @@ -21,6 +21,7 @@ import edu.umd.cs.findbugs.annotations.Nullable; import java.util.List; import org.apache.hadoop.hbase.ClusterMetrics; import org.apache.hadoop.hbase.hbtop.Record; +import org.apache.hadoop.hbase.hbtop.RecordFilter; import org.apache.hadoop.hbase.hbtop.field.Field; import org.apache.hadoop.hbase.hbtop.field.FieldInfo; import org.apache.yetus.audience.InterfaceAudience; @@ -33,6 +34,6 @@ import org.apache.yetus.audience.InterfaceAudience; interface ModeStrategy { List getFieldInfos(); Field getDefaultSortField(); - List getRecords(ClusterMetrics clusterMetrics); + List getRecords(ClusterMetrics clusterMetrics, List pushDownFilters); @Nullable DrillDownInfo drillDown(Record selectedRecord); } diff --git a/hbase-hbtop/src/main/java/org/apache/hadoop/hbase/hbtop/mode/ModeStrategyUtils.java b/hbase-hbtop/src/main/java/org/apache/hadoop/hbase/hbtop/mode/ModeStrategyUtils.java new file mode 100644 index 00000000000..9175820e0ca --- /dev/null +++ b/hbase-hbtop/src/main/java/org/apache/hadoop/hbase/hbtop/mode/ModeStrategyUtils.java @@ -0,0 +1,63 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.hbtop.mode; + +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import org.apache.hadoop.hbase.hbtop.Record; +import org.apache.hadoop.hbase.hbtop.RecordFilter; +import org.apache.hadoop.hbase.hbtop.field.Field; + +public final class ModeStrategyUtils { + private ModeStrategyUtils() { + + } + + /** + * Filter records as per the supplied filters, + * @param records records to be processed + * @param filters List of filters + * @return filtered records + */ + public static List applyFilterAndGet(List records, + List filters) { + if (filters != null && !filters.isEmpty()) { + return records.stream().filter(r -> filters.stream().allMatch(f -> f.execute(r))) + .collect(Collectors.toList()); + } + return records; + } + + + /** + * Group by records on the basis of supplied groupBy field and + * Aggregate records using {@link Record#combine(Record)} + * + * @param records records needs to be processed + * @param groupBy Field to be used for group by + * @return aggregated records + */ + public static List aggregateRecords(List records, Field groupBy) { + return records.stream().collect(Collectors.groupingBy(r -> r.get(groupBy))).entrySet().stream() + .flatMap(e -> e.getValue().stream().reduce(Record::combine).map(Stream::of) + .orElse(Stream.empty())).collect(Collectors.toList()); + } + +} diff --git a/hbase-hbtop/src/main/java/org/apache/hadoop/hbase/hbtop/mode/NamespaceModeStrategy.java b/hbase-hbtop/src/main/java/org/apache/hadoop/hbase/hbtop/mode/NamespaceModeStrategy.java index 866f57e4ded..f74d8bf22eb 100644 --- a/hbase-hbtop/src/main/java/org/apache/hadoop/hbase/hbtop/mode/NamespaceModeStrategy.java +++ b/hbase-hbtop/src/main/java/org/apache/hadoop/hbase/hbtop/mode/NamespaceModeStrategy.java @@ -20,8 +20,7 @@ package org.apache.hadoop.hbase.hbtop.mode; import java.util.Arrays; import java.util.Collections; import java.util.List; -import java.util.stream.Collectors; -import java.util.stream.Stream; + import org.apache.hadoop.hbase.ClusterMetrics; import org.apache.hadoop.hbase.hbtop.Record; import org.apache.hadoop.hbase.hbtop.RecordFilter; @@ -64,27 +63,14 @@ public final class NamespaceModeStrategy implements ModeStrategy { return Field.REQUEST_COUNT_PER_SECOND; } - @Override - public List getRecords(ClusterMetrics clusterMetrics) { + @Override public List getRecords(ClusterMetrics clusterMetrics, + List pushDownFilters) { // Get records from RegionModeStrategy and add REGION_COUNT field - List records = regionModeStrategy.getRecords(clusterMetrics).stream() - .map(record -> - Record.ofEntries(fieldInfos.stream() - .filter(fi -> record.containsKey(fi.getField())) - .map(fi -> Record.entry(fi.getField(), record.get(fi.getField()))))) - .map(record -> Record.builder().putAll(record).put(Field.REGION_COUNT, 1).build()) - .collect(Collectors.toList()); + List records = regionModeStrategy.selectModeFieldsAndAddCountField(fieldInfos, + regionModeStrategy.getRecords(clusterMetrics, pushDownFilters), Field.REGION_COUNT); // Aggregation by NAMESPACE field - return records.stream() - .collect(Collectors.groupingBy(r -> r.get(Field.NAMESPACE).asString())) - .entrySet().stream() - .flatMap( - e -> e.getValue().stream() - .reduce(Record::combine) - .map(Stream::of) - .orElse(Stream.empty())) - .collect(Collectors.toList()); + return ModeStrategyUtils.aggregateRecords(records, Field.NAMESPACE); } @Override diff --git a/hbase-hbtop/src/main/java/org/apache/hadoop/hbase/hbtop/mode/RegionModeStrategy.java b/hbase-hbtop/src/main/java/org/apache/hadoop/hbase/hbtop/mode/RegionModeStrategy.java index e5deda06be1..0adbc823bf4 100644 --- a/hbase-hbtop/src/main/java/org/apache/hadoop/hbase/hbtop/mode/RegionModeStrategy.java +++ b/hbase-hbtop/src/main/java/org/apache/hadoop/hbase/hbtop/mode/RegionModeStrategy.java @@ -24,6 +24,8 @@ import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.stream.Collectors; + import org.apache.commons.lang3.time.FastDateFormat; import org.apache.hadoop.hbase.ClusterMetrics; import org.apache.hadoop.hbase.RegionMetrics; @@ -31,6 +33,7 @@ import org.apache.hadoop.hbase.ServerMetrics; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.hbtop.Record; +import org.apache.hadoop.hbase.hbtop.RecordFilter; import org.apache.hadoop.hbase.hbtop.field.Field; import org.apache.hadoop.hbase.hbtop.field.FieldInfo; import org.apache.hadoop.hbase.util.Bytes; @@ -83,8 +86,8 @@ public final class RegionModeStrategy implements ModeStrategy { return Field.REQUEST_COUNT_PER_SECOND; } - @Override - public List getRecords(ClusterMetrics clusterMetrics) { + @Override public List getRecords(ClusterMetrics clusterMetrics, + List pushDownFilters) { List ret = new ArrayList<>(); for (ServerMetrics sm : clusterMetrics.getLiveServerMetrics().values()) { long lastReportTimestamp = sm.getLastReportTimestamp(); @@ -174,6 +177,27 @@ public final class RegionModeStrategy implements ModeStrategy { return builder.build(); } + /** + * Form new record list with records formed by only fields provided through fieldInfo and + * add a count field for each record with value 1 + * We are doing two operation of selecting and adding new field + * because of saving some CPU cycles on rebuilding the record again + * + * @param fieldInfos List of FieldInfos required in the record + * @param records List of records which needs to be processed + * @param countField Field which needs to be added with value 1 for each record + * @return records after selecting required fields and adding count field + */ + List selectModeFieldsAndAddCountField(List fieldInfos, List records, + Field countField) { + + return records.stream().map(record -> Record.ofEntries( + fieldInfos.stream().filter(fi -> record.containsKey(fi.getField())) + .map(fi -> Record.entry(fi.getField(), record.get(fi.getField()))))) + .map(record -> Record.builder().putAll(record).put(countField, 1).build()) + .collect(Collectors.toList()); + } + @Nullable @Override public DrillDownInfo drillDown(Record selectedRecord) { diff --git a/hbase-hbtop/src/main/java/org/apache/hadoop/hbase/hbtop/mode/RegionServerModeStrategy.java b/hbase-hbtop/src/main/java/org/apache/hadoop/hbase/hbtop/mode/RegionServerModeStrategy.java index d64f713da6a..44a9a2c8271 100644 --- a/hbase-hbtop/src/main/java/org/apache/hadoop/hbase/hbtop/mode/RegionServerModeStrategy.java +++ b/hbase-hbtop/src/main/java/org/apache/hadoop/hbase/hbtop/mode/RegionServerModeStrategy.java @@ -23,7 +23,7 @@ import java.util.Collections; import java.util.List; import java.util.Map; import java.util.stream.Collectors; -import java.util.stream.Stream; + import org.apache.hadoop.hbase.ClusterMetrics; import org.apache.hadoop.hbase.ServerMetrics; import org.apache.hadoop.hbase.hbtop.Record; @@ -70,27 +70,15 @@ public final class RegionServerModeStrategy implements ModeStrategy { return Field.REQUEST_COUNT_PER_SECOND; } - @Override - public List getRecords(ClusterMetrics clusterMetrics) { + @Override public List getRecords(ClusterMetrics clusterMetrics, + List pushDownFilters) { // Get records from RegionModeStrategy and add REGION_COUNT field - List records = regionModeStrategy.getRecords(clusterMetrics).stream() - .map(record -> - Record.ofEntries(fieldInfos.stream() - .filter(fi -> record.containsKey(fi.getField())) - .map(fi -> Record.entry(fi.getField(), record.get(fi.getField()))))) - .map(record -> Record.builder().putAll(record).put(Field.REGION_COUNT, 1).build()) - .collect(Collectors.toList()); - + List records = regionModeStrategy.selectModeFieldsAndAddCountField(fieldInfos, + regionModeStrategy.getRecords(clusterMetrics, pushDownFilters), Field.REGION_COUNT); // Aggregation by LONG_REGION_SERVER field - Map retMap = records.stream() - .collect(Collectors.groupingBy(r -> r.get(Field.LONG_REGION_SERVER).asString())) - .entrySet().stream() - .flatMap( - e -> e.getValue().stream() - .reduce(Record::combine) - .map(Stream::of) - .orElse(Stream.empty())) - .collect(Collectors.toMap(r -> r.get(Field.LONG_REGION_SERVER).asString(), r -> r)); + Map retMap = + ModeStrategyUtils.aggregateRecords(records, Field.LONG_REGION_SERVER).stream() + .collect(Collectors.toMap(r -> r.get(Field.LONG_REGION_SERVER).asString(), r -> r)); // Add USED_HEAP_SIZE field and MAX_HEAP_SIZE field for (ServerMetrics sm : clusterMetrics.getLiveServerMetrics().values()) { diff --git a/hbase-hbtop/src/main/java/org/apache/hadoop/hbase/hbtop/mode/TableModeStrategy.java b/hbase-hbtop/src/main/java/org/apache/hadoop/hbase/hbtop/mode/TableModeStrategy.java index 1da074fc918..4acc3441258 100644 --- a/hbase-hbtop/src/main/java/org/apache/hadoop/hbase/hbtop/mode/TableModeStrategy.java +++ b/hbase-hbtop/src/main/java/org/apache/hadoop/hbase/hbtop/mode/TableModeStrategy.java @@ -65,16 +65,11 @@ public final class TableModeStrategy implements ModeStrategy { return Field.REQUEST_COUNT_PER_SECOND; } - @Override - public List getRecords(ClusterMetrics clusterMetrics) { + @Override public List getRecords(ClusterMetrics clusterMetrics, + List pushDownFilters) { // Get records from RegionModeStrategy and add REGION_COUNT field - List records = regionModeStrategy.getRecords(clusterMetrics).stream() - .map(record -> - Record.ofEntries(fieldInfos.stream() - .filter(fi -> record.containsKey(fi.getField())) - .map(fi -> Record.entry(fi.getField(), record.get(fi.getField()))))) - .map(record -> Record.builder().putAll(record).put(Field.REGION_COUNT, 1).build()) - .collect(Collectors.toList()); + List records = regionModeStrategy.selectModeFieldsAndAddCountField(fieldInfos, + regionModeStrategy.getRecords(clusterMetrics, pushDownFilters), Field.REGION_COUNT); // Aggregation by NAMESPACE field and TABLE field return records.stream() diff --git a/hbase-hbtop/src/main/java/org/apache/hadoop/hbase/hbtop/mode/UserModeStrategy.java b/hbase-hbtop/src/main/java/org/apache/hadoop/hbase/hbtop/mode/UserModeStrategy.java new file mode 100644 index 00000000000..605376e1221 --- /dev/null +++ b/hbase-hbtop/src/main/java/org/apache/hadoop/hbase/hbtop/mode/UserModeStrategy.java @@ -0,0 +1,70 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.hbtop.mode; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +import org.apache.hadoop.hbase.ClusterMetrics; +import org.apache.hadoop.hbase.hbtop.Record; +import org.apache.hadoop.hbase.hbtop.RecordFilter; +import org.apache.hadoop.hbase.hbtop.field.Field; +import org.apache.hadoop.hbase.hbtop.field.FieldInfo; +import org.apache.yetus.audience.InterfaceAudience; + +/** + * Implementation for {@link ModeStrategy} for User Mode. + */ +@InterfaceAudience.Private public final class UserModeStrategy implements ModeStrategy { + + private final List fieldInfos = Arrays + .asList(new FieldInfo(Field.USER, 0, true), + new FieldInfo(Field.CLIENT_COUNT, 7, true), + new FieldInfo(Field.REQUEST_COUNT_PER_SECOND, 10, true), + new FieldInfo(Field.READ_REQUEST_COUNT_PER_SECOND, 10, true), + new FieldInfo(Field.WRITE_REQUEST_COUNT_PER_SECOND, 10, true), + new FieldInfo(Field.FILTERED_READ_REQUEST_COUNT_PER_SECOND, 10, true)); + private final ClientModeStrategy clientModeStrategy = new ClientModeStrategy(); + + UserModeStrategy() { + } + + @Override public List getFieldInfos() { + return fieldInfos; + } + + @Override public Field getDefaultSortField() { + return Field.REQUEST_COUNT_PER_SECOND; + } + + @Override public List getRecords(ClusterMetrics clusterMetrics, + List pushDownFilters) { + List records = clientModeStrategy.createRecords(clusterMetrics); + return clientModeStrategy.aggregateRecordsAndAddDistinct( + ModeStrategyUtils.applyFilterAndGet(records, pushDownFilters), Field.USER, Field.CLIENT, + Field.CLIENT_COUNT); + } + + @Override public DrillDownInfo drillDown(Record selectedRecord) { + //Drill down to client and using selected USER as a filter + List initialFilters = Collections.singletonList( + RecordFilter.newBuilder(Field.USER).doubleEquals(selectedRecord.get(Field.USER))); + return new DrillDownInfo(Mode.CLIENT, initialFilters); + } +} diff --git a/hbase-hbtop/src/main/java/org/apache/hadoop/hbase/hbtop/screen/top/TopScreenModel.java b/hbase-hbtop/src/main/java/org/apache/hadoop/hbase/hbtop/screen/top/TopScreenModel.java index 42e81e156fb..3dd7f12b091 100644 --- a/hbase-hbtop/src/main/java/org/apache/hadoop/hbase/hbtop/screen/top/TopScreenModel.java +++ b/hbase-hbtop/src/main/java/org/apache/hadoop/hbase/hbtop/screen/top/TopScreenModel.java @@ -17,12 +17,15 @@ */ package org.apache.hadoop.hbase.hbtop.screen.top; +import static org.apache.commons.lang3.time.DateFormatUtils.ISO_8601_EXTENDED_TIME_FORMAT; + import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.Objects; import java.util.stream.Collectors; -import org.apache.commons.lang3.time.DateFormatUtils; + import org.apache.hadoop.hbase.ClusterMetrics; import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.hbtop.Record; @@ -56,6 +59,7 @@ public class TopScreenModel { private List records; private final List filters = new ArrayList<>(); + private final List pushDownFilters = new ArrayList<>(); private final List filterHistories = new ArrayList<>(); private boolean ascendingSort; @@ -88,6 +92,7 @@ public class TopScreenModel { if (initialFilters != null) { filters.addAll(initialFilters); } + decomposePushDownFilter(); } public void setSortFieldAndFields(Field sortField, List fields) { @@ -113,7 +118,7 @@ public class TopScreenModel { } private void refreshSummary(ClusterMetrics clusterMetrics) { - String currentTime = DateFormatUtils.ISO_8601_EXTENDED_TIME_FORMAT + String currentTime = ISO_8601_EXTENDED_TIME_FORMAT .format(System.currentTimeMillis()); String version = clusterMetrics.getHBaseVersion(); String clusterId = clusterMetrics.getClusterId(); @@ -130,7 +135,7 @@ public class TopScreenModel { } private void refreshRecords(ClusterMetrics clusterMetrics) { - List records = currentMode.getRecords(clusterMetrics); + List records = currentMode.getRecords(clusterMetrics, pushDownFilters); // Filter and sort records = records.stream() @@ -153,13 +158,13 @@ public class TopScreenModel { if (filter == null) { return false; } - filters.add(filter); filterHistories.add(filterString); return true; } public void clearFilters() { + pushDownFilters.clear(); filters.clear(); } @@ -203,4 +208,18 @@ public class TopScreenModel { public List getFilterHistories() { return Collections.unmodifiableList(filterHistories); } + + private void decomposePushDownFilter() { + pushDownFilters.clear(); + for (RecordFilter filter : filters) { + if (!fields.contains(filter.getField())) { + pushDownFilters.add(filter); + } + } + filters.removeAll(pushDownFilters); + } + + public Collection getPushDownFilters() { + return pushDownFilters; + } } diff --git a/hbase-hbtop/src/main/java/org/apache/hadoop/hbase/hbtop/screen/top/TopScreenPresenter.java b/hbase-hbtop/src/main/java/org/apache/hadoop/hbase/hbtop/screen/top/TopScreenPresenter.java index d435f5c1dfc..e4cd3867004 100644 --- a/hbase-hbtop/src/main/java/org/apache/hadoop/hbase/hbtop/screen/top/TopScreenPresenter.java +++ b/hbase-hbtop/src/main/java/org/apache/hadoop/hbase/hbtop/screen/top/TopScreenPresenter.java @@ -25,6 +25,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; import org.apache.hadoop.hbase.hbtop.Record; +import org.apache.hadoop.hbase.hbtop.RecordFilter; import org.apache.hadoop.hbase.hbtop.field.Field; import org.apache.hadoop.hbase.hbtop.field.FieldInfo; import org.apache.hadoop.hbase.hbtop.mode.Mode; @@ -324,7 +325,8 @@ public class TopScreenPresenter { } public ScreenView goToFilterDisplayMode(Screen screen, Terminal terminal, int row) { - return new FilterDisplayModeScreenView(screen, terminal, row, topScreenModel.getFilters(), - topScreenView); + ArrayList filters = new ArrayList<>(topScreenModel.getFilters()); + filters.addAll(topScreenModel.getPushDownFilters()); + return new FilterDisplayModeScreenView(screen, terminal, row, filters, topScreenView); } } diff --git a/hbase-hbtop/src/test/java/org/apache/hadoop/hbase/hbtop/TestUtils.java b/hbase-hbtop/src/test/java/org/apache/hadoop/hbase/hbtop/TestUtils.java index 43a84474027..bad2a00aec4 100644 --- a/hbase-hbtop/src/test/java/org/apache/hadoop/hbase/hbtop/TestUtils.java +++ b/hbase-hbtop/src/test/java/org/apache/hadoop/hbase/hbtop/TestUtils.java @@ -27,6 +27,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; + import org.apache.commons.lang3.time.FastDateFormat; import org.apache.hadoop.hbase.ClusterMetrics; import org.apache.hadoop.hbase.ClusterMetricsBuilder; @@ -37,6 +38,8 @@ import org.apache.hadoop.hbase.ServerMetricsBuilder; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.Size; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.UserMetrics; +import org.apache.hadoop.hbase.UserMetricsBuilder; import org.apache.hadoop.hbase.client.RegionInfoBuilder; import org.apache.hadoop.hbase.hbtop.field.Field; import org.apache.hadoop.hbase.hbtop.screen.top.Summary; @@ -54,6 +57,9 @@ public final class TestUtils { // host1 List regionMetricsList = new ArrayList<>(); + List userMetricsList = new ArrayList<>(); + userMetricsList.add(createUserMetrics("FOO",1,2, 4)); + userMetricsList.add(createUserMetrics("BAR",2,3, 3)); regionMetricsList.add(createRegionMetrics( "table1,,1.00000000000000000000000000000000.", 100, 50, 100, @@ -73,10 +79,13 @@ public final class TestUtils { ServerName host1 = ServerName.valueOf("host1.apache.com", 1000, 1); serverMetricsMap.put(host1, createServerMetrics(host1, 100, new Size(100, Size.Unit.MEGABYTE), new Size(200, Size.Unit.MEGABYTE), 100, - regionMetricsList)); + regionMetricsList, userMetricsList)); // host2 regionMetricsList.clear(); + userMetricsList.clear(); + userMetricsList.add(createUserMetrics("FOO",5,7, 3)); + userMetricsList.add(createUserMetrics("BAR",4,8, 4)); regionMetricsList.add(createRegionMetrics( "table1,1,4.00000000000000000000000000000003.", 100, 50, 100, @@ -96,7 +105,7 @@ public final class TestUtils { ServerName host2 = ServerName.valueOf("host2.apache.com", 1001, 2); serverMetricsMap.put(host2, createServerMetrics(host2, 200, new Size(16, Size.Unit.GIGABYTE), new Size(32, Size.Unit.GIGABYTE), 200, - regionMetricsList)); + regionMetricsList, userMetricsList)); ServerName host3 = ServerName.valueOf("host3.apache.com", 1002, 3); return ClusterMetricsBuilder.newBuilder() @@ -117,6 +126,15 @@ public final class TestUtils { .build(); } + private static UserMetrics createUserMetrics(String user, long readRequestCount, + long writeRequestCount, long filteredReadRequestsCount) { + return UserMetricsBuilder.newBuilder(Bytes.toBytes(user)).addClientMetris( + new UserMetricsBuilder.ClientMetricsImpl("CLIENT_A_" + user, readRequestCount, + writeRequestCount, filteredReadRequestsCount)).addClientMetris( + new UserMetricsBuilder.ClientMetricsImpl("CLIENT_B_" + user, readRequestCount, + writeRequestCount, filteredReadRequestsCount)).build(); + } + private static RegionMetrics createRegionMetrics(String regionName, long readRequestCount, long filteredReadRequestCount, long writeRequestCount, Size storeFileSize, Size uncompressedStoreFileSize, int storeFileCount, Size memStoreSize, float locality, @@ -139,14 +157,15 @@ public final class TestUtils { private static ServerMetrics createServerMetrics(ServerName serverName, long reportTimestamp, Size usedHeapSize, Size maxHeapSize, long requestCountPerSecond, - List regionMetricsList) { + List regionMetricsList, List userMetricsList) { return ServerMetricsBuilder.newBuilder(serverName) .setReportTimestamp(reportTimestamp) .setUsedHeapSize(usedHeapSize) .setMaxHeapSize(maxHeapSize) .setRequestCountPerSecond(requestCountPerSecond) - .setRegionMetrics(regionMetricsList).build(); + .setRegionMetrics(regionMetricsList) + .setUserMetrics(userMetricsList).build(); } public static void assertRecordsInRegionMode(List records) { @@ -316,10 +335,78 @@ public final class TestUtils { } } + public static void assertRecordsInUserMode(List records) { + assertThat(records.size(), is(2)); + for (Record record : records) { + String user = record.get(Field.USER).asString(); + switch (user) { + //readRequestPerSecond and writeRequestPerSecond will be zero + // because there is no change or new metrics during refresh + case "FOO": + assertRecordInUserMode(record, 0L, 0L, 0L); + break; + case "BAR": + assertRecordInUserMode(record, 0L, 0L, 0L); + break; + default: + fail(); + } + } + } + + public static void assertRecordsInClientMode(List records) { + assertThat(records.size(), is(4)); + for (Record record : records) { + String client = record.get(Field.CLIENT).asString(); + switch (client) { + //readRequestPerSecond and writeRequestPerSecond will be zero + // because there is no change or new metrics during refresh + case "CLIENT_A_FOO": + assertRecordInClientMode(record, 0L, 0L, 0L); + break; + case "CLIENT_A_BAR": + assertRecordInClientMode(record, 0L, 0L, 0L); + break; + case "CLIENT_B_FOO": + assertRecordInClientMode(record, 0L, 0L, 0L); + break; + case "CLIENT_B_BAR": + assertRecordInClientMode(record, 0L, 0L, 0L); + break; + default: + fail(); + } + } + } + + private static void assertRecordInUserMode(Record record, long readRequestCountPerSecond, + long writeCountRequestPerSecond, long filteredReadRequestsCount) { + assertThat(record.size(), is(6)); + assertThat(record.get(Field.READ_REQUEST_COUNT_PER_SECOND).asLong(), + is(readRequestCountPerSecond)); + assertThat(record.get(Field.WRITE_REQUEST_COUNT_PER_SECOND).asLong(), + is(writeCountRequestPerSecond)); + assertThat(record.get(Field.FILTERED_READ_REQUEST_COUNT_PER_SECOND).asLong(), + is(filteredReadRequestsCount)); + assertThat(record.get(Field.CLIENT_COUNT).asInt(), is(2)); + } + + private static void assertRecordInClientMode(Record record, long readRequestCountPerSecond, + long writeCountRequestPerSecond, long filteredReadRequestsCount) { + assertThat(record.size(), is(6)); + assertThat(record.get(Field.READ_REQUEST_COUNT_PER_SECOND).asLong(), + is(readRequestCountPerSecond)); + assertThat(record.get(Field.WRITE_REQUEST_COUNT_PER_SECOND).asLong(), + is(writeCountRequestPerSecond)); + assertThat(record.get(Field.FILTERED_READ_REQUEST_COUNT_PER_SECOND).asLong(), + is(filteredReadRequestsCount)); + assertThat(record.get(Field.USER_COUNT).asInt(), is(1)); + } + private static void assertRecordInTableMode(Record record, long requestCountPerSecond, - long readRequestCountPerSecond, long filteredReadRequestCountPerSecond, - long writeCountRequestPerSecond, Size storeFileSize, Size uncompressedStoreFileSize, - int numStoreFiles, Size memStoreSize, int regionCount) { + long readRequestCountPerSecond, long filteredReadRequestCountPerSecond, + long writeCountRequestPerSecond, Size storeFileSize, Size uncompressedStoreFileSize, + int numStoreFiles, Size memStoreSize, int regionCount) { assertThat(record.size(), is(11)); assertThat(record.get(Field.REQUEST_COUNT_PER_SECOND).asLong(), is(requestCountPerSecond)); diff --git a/hbase-hbtop/src/test/java/org/apache/hadoop/hbase/hbtop/mode/ClientModeTest.java b/hbase-hbtop/src/test/java/org/apache/hadoop/hbase/hbtop/mode/ClientModeTest.java new file mode 100644 index 00000000000..82dbe45e822 --- /dev/null +++ b/hbase-hbtop/src/test/java/org/apache/hadoop/hbase/hbtop/mode/ClientModeTest.java @@ -0,0 +1,72 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.hbtop.mode; + +import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.fail; + +import java.util.List; + +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.hbtop.Record; +import org.apache.hadoop.hbase.hbtop.TestUtils; +import org.apache.hadoop.hbase.hbtop.field.Field; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.junit.ClassRule; +import org.junit.experimental.categories.Category; + +@Category(SmallTests.class) +public class ClientModeTest extends ModeTestBase { + + @ClassRule public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(ClientModeTest.class); + + @Override protected Mode getMode() { + return Mode.CLIENT; + } + + @Override protected void assertRecords(List records) { + TestUtils.assertRecordsInClientMode(records); + } + + @Override protected void assertDrillDown(Record currentRecord, DrillDownInfo drillDownInfo) { + assertThat(drillDownInfo.getNextMode(), is(Mode.USER)); + assertThat(drillDownInfo.getInitialFilters().size(), is(1)); + String client = currentRecord.get(Field.CLIENT).asString(); + switch (client) { + case "CLIENT_A_FOO": + assertThat(drillDownInfo.getInitialFilters().get(0).toString(), is("CLIENT==CLIENT_A_FOO")); + break; + + case "CLIENT_B_FOO": + assertThat(drillDownInfo.getInitialFilters().get(0).toString(), is("CLIENT==CLIENT_B_FOO")); + break; + + case "CLIENT_A_BAR": + assertThat(drillDownInfo.getInitialFilters().get(0).toString(), is("CLIENT==CLIENT_A_BAR")); + break; + case "CLIENT_B_BAR": + assertThat(drillDownInfo.getInitialFilters().get(0).toString(), is("CLIENT==CLIENT_B_BAR")); + break; + + default: + fail(); + } + } +} diff --git a/hbase-hbtop/src/test/java/org/apache/hadoop/hbase/hbtop/mode/ModeTestBase.java b/hbase-hbtop/src/test/java/org/apache/hadoop/hbase/hbtop/mode/ModeTestBase.java index 7ad1a3a870a..2b6db84e047 100644 --- a/hbase-hbtop/src/test/java/org/apache/hadoop/hbase/hbtop/mode/ModeTestBase.java +++ b/hbase-hbtop/src/test/java/org/apache/hadoop/hbase/hbtop/mode/ModeTestBase.java @@ -27,7 +27,8 @@ public abstract class ModeTestBase { @Test public void testGetRecords() { - List records = getMode().getRecords(TestUtils.createDummyClusterMetrics()); + List records = getMode().getRecords(TestUtils.createDummyClusterMetrics(), + null); assertRecords(records); } @@ -36,7 +37,8 @@ public abstract class ModeTestBase { @Test public void testDrillDown() { - List records = getMode().getRecords(TestUtils.createDummyClusterMetrics()); + List records = getMode().getRecords(TestUtils.createDummyClusterMetrics(), + null); for (Record record : records) { assertDrillDown(record, getMode().drillDown(record)); } diff --git a/hbase-hbtop/src/test/java/org/apache/hadoop/hbase/hbtop/mode/UserModeTest.java b/hbase-hbtop/src/test/java/org/apache/hadoop/hbase/hbtop/mode/UserModeTest.java new file mode 100644 index 00000000000..32e111c78d9 --- /dev/null +++ b/hbase-hbtop/src/test/java/org/apache/hadoop/hbase/hbtop/mode/UserModeTest.java @@ -0,0 +1,70 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.hbtop.mode; + +import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.fail; + +import java.util.List; + +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.hbtop.Record; +import org.apache.hadoop.hbase.hbtop.TestUtils; +import org.apache.hadoop.hbase.hbtop.field.Field; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.junit.ClassRule; +import org.junit.experimental.categories.Category; + +@Category(SmallTests.class) +public class UserModeTest extends ModeTestBase { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(UserModeTest.class); + + @Override + protected Mode getMode() { + return Mode.USER; + } + + @Override + protected void assertRecords(List records) { + TestUtils.assertRecordsInUserMode(records); + } + + @Override + protected void assertDrillDown(Record currentRecord, DrillDownInfo drillDownInfo) { + assertThat(drillDownInfo.getNextMode(), is(Mode.CLIENT)); + assertThat(drillDownInfo.getInitialFilters().size(), is(1)); + String userName = currentRecord.get(Field.USER).asString(); + + switch (userName) { + case "FOO": + assertThat(drillDownInfo.getInitialFilters().get(0).toString(), is("USER==FOO")); + break; + + case "BAR": + assertThat(drillDownInfo.getInitialFilters().get(0).toString(), is("USER==BAR")); + break; + + default: + fail(); + } + } +} diff --git a/hbase-protocol-shaded/src/main/protobuf/ClusterStatus.proto b/hbase-protocol-shaded/src/main/protobuf/ClusterStatus.proto index 78d2e83d5c7..77b9887902b 100644 --- a/hbase-protocol-shaded/src/main/protobuf/ClusterStatus.proto +++ b/hbase-protocol-shaded/src/main/protobuf/ClusterStatus.proto @@ -157,6 +157,29 @@ message RegionLoad { optional int32 max_store_file_ref_count = 22 [default = 0]; } +message UserLoad { + + /** short user name */ + required string userName = 1; + + /** Metrics for all clients of a user */ + repeated ClientMetrics clientMetrics = 2; +} + +message ClientMetrics { + /** client host name */ + required string hostName = 1; + + /** the current total read requests made from a client */ + optional uint64 read_requests_count = 2; + + /** the current total write requests made from a client */ + optional uint64 write_requests_count = 3; + + /** the current total filtered requests made from a client */ + optional uint64 filtered_requests_count = 4; +} + /* Server-level protobufs */ message ReplicationLoadSink { @@ -230,6 +253,11 @@ message ServerLoad { * The replicationLoadSink for the replication Sink status of this region server. */ optional ReplicationLoadSink replLoadSink = 11; + + /** + * The metrics for each user on this region server + */ + repeated UserLoad userLoads = 12; } message LiveServerInfo { diff --git a/hbase-protocol/src/main/protobuf/ClusterStatus.proto b/hbase-protocol/src/main/protobuf/ClusterStatus.proto index 34949088008..02347386ece 100644 --- a/hbase-protocol/src/main/protobuf/ClusterStatus.proto +++ b/hbase-protocol/src/main/protobuf/ClusterStatus.proto @@ -153,6 +153,32 @@ message RegionLoad { optional int32 max_store_file_ref_count = 22 [default = 0]; } +message UserLoad { + + /** short user name */ + required string userName = 1; + + /** Metrics for all clients of a user */ + repeated ClientMetrics clientMetrics = 2; + + +} + +message ClientMetrics { + /** client host name */ + required string hostName = 1; + + /** the current total read requests made from a client */ + optional uint64 read_requests_count = 2; + + /** the current total write requests made from a client */ + optional uint64 write_requests_count = 3; + + /** the current total filtered requests made from a client */ + optional uint64 filtered_requests_count = 4; + +} + /* Server-level protobufs */ message ReplicationLoadSink { @@ -219,6 +245,11 @@ message ServerLoad { * The replicationLoadSink for the replication Sink status of this region server. */ optional ReplicationLoadSink replLoadSink = 11; + + /** + * The metrics for each user on this region server + */ + repeated UserLoad userLoads = 12; } message LiveServerInfo { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index a431e5d08da..d1490c50263 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -843,7 +843,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi // TODO: revisit if coprocessors should load in other cases this.coprocessorHost = new RegionCoprocessorHost(this, rsServices, conf); this.metricsRegionWrapper = new MetricsRegionWrapperImpl(this); - this.metricsRegion = new MetricsRegion(this.metricsRegionWrapper); + this.metricsRegion = new MetricsRegion(this.metricsRegionWrapper, conf); } else { this.metricsRegionWrapper = null; this.metricsRegion = null; @@ -6582,6 +6582,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi if (!outResults.isEmpty()) { readRequestsCount.increment(); + if (metricsRegion != null) { + metricsRegion.updateReadRequestCount(); + } } if (rsServices != null && rsServices.getMetrics() != null) { rsServices.getMetrics().updateReadQueryMeter(getRegionInfo().getTable()); @@ -6909,6 +6912,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi protected void incrementCountOfRowsFilteredMetric(ScannerContext scannerContext) { filteredReadRequestsCount.increment(); + if (metricsRegion != null) { + metricsRegion.updateFilteredRecords(); + } if (scannerContext == null || !scannerContext.isTrackingMetrics()) return; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 31049826e79..1c1f0b07c62 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -56,6 +56,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.function.Function; import javax.management.MalformedObjectNameException; import javax.servlet.http.HttpServlet; + import org.apache.commons.lang3.RandomUtils; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.SystemUtils; @@ -208,6 +209,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.Coprocesso import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.RegionLoad; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.RegionStoreSequenceIds; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.UserLoad; import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.Coprocessor; import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.Coprocessor.Builder; import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.NameStringPair; @@ -1379,7 +1381,14 @@ public class HRegionServer extends HasThread implements } else { serverLoad.setInfoServerPort(-1); } - + MetricsUserAggregateSource userSource = + metricsRegionServer.getMetricsUserAggregate().getSource(); + if (userSource != null) { + Map userMetricMap = userSource.getUserSources(); + for (Entry entry : userMetricMap.entrySet()) { + serverLoad.addUserLoads(createUserLoad(entry.getKey(), entry.getValue())); + } + } // for the replicationLoad purpose. Only need to get from one executorService // either source or sink will get the same info ReplicationSourceService rsources = getReplicationSourceService(); @@ -1717,6 +1726,19 @@ public class HRegionServer extends HasThread implements return regionLoadBldr.build(); } + private UserLoad createUserLoad(String user, MetricsUserSource userSource) { + UserLoad.Builder userLoadBldr = UserLoad.newBuilder(); + userLoadBldr.setUserName(user); + userSource.getClientMetrics().values().stream().map( + clientMetrics -> ClusterStatusProtos.ClientMetrics.newBuilder() + .setHostName(clientMetrics.getHostName()) + .setWriteRequestsCount(clientMetrics.getWriteRequestsCount()) + .setFilteredRequestsCount(clientMetrics.getFilteredReadRequests()) + .setReadRequestsCount(clientMetrics.getReadRequestsCount()).build()) + .forEach(userLoadBldr::addClientMetrics); + return userLoadBldr.build(); + } + public RegionLoad createRegionLoad(final String encodedRegionName) throws IOException { HRegion r = onlineRegions.get(encodedRegionName); return r != null ? createRegionLoad(r, null, null) : null; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegion.java index 868d9b95d4b..a1dad021224 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegion.java @@ -18,6 +18,7 @@ */ package org.apache.hadoop.hbase.regionserver; +import org.apache.hadoop.conf.Configuration; import org.apache.yetus.audience.InterfaceAudience; import org.apache.hadoop.hbase.CompatibilitySingletonFactory; @@ -29,12 +30,14 @@ import org.apache.hadoop.hbase.CompatibilitySingletonFactory; @InterfaceAudience.Private public class MetricsRegion { private final MetricsRegionSource source; + private final MetricsUserAggregate userAggregate; private MetricsRegionWrapper regionWrapper; - public MetricsRegion(final MetricsRegionWrapper wrapper) { + public MetricsRegion(final MetricsRegionWrapper wrapper, Configuration conf) { source = CompatibilitySingletonFactory.getInstance(MetricsRegionServerSourceFactory.class) .createRegion(wrapper); this.regionWrapper = wrapper; + userAggregate = MetricsUserAggregateFactory.getMetricsUserAggregate(conf); } public void close() { @@ -57,6 +60,9 @@ public class MetricsRegion { source.updateScanTime(t); } + public void updateFilteredRecords(){ + userAggregate.updateFilteredReadRequests(); + } public void updateAppend() { source.updateAppend(); } @@ -73,4 +79,7 @@ public class MetricsRegion { return regionWrapper; } + public void updateReadRequestCount() { + userAggregate.updateReadRequestCount(); + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServer.java index 808fc582314..98f400ee9fb 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServer.java @@ -101,7 +101,7 @@ public class MetricsRegionServer { } @VisibleForTesting - public org.apache.hadoop.hbase.regionserver.MetricsUserAggregate getMetricsUserAggregate() { + public MetricsUserAggregate getMetricsUserAggregate() { return userAggregate; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsUserAggregate.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsUserAggregate.java index 9b23ccc4456..41d6d6b1e49 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsUserAggregate.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsUserAggregate.java @@ -23,6 +23,11 @@ import org.apache.yetus.audience.InterfaceAudience; @InterfaceAudience.Private public interface MetricsUserAggregate { + /** + * @return return a singleton instance of MetricsUserAggregateSource or null in case of NoOp + */ + MetricsUserAggregateSource getSource(); + void updatePut(long t); void updateDelete(long t); @@ -36,4 +41,8 @@ public interface MetricsUserAggregate { void updateReplay(long t); void updateScanTime(long t); + + void updateFilteredReadRequests(); + + void updateReadRequestCount(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsUserAggregateFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsUserAggregateFactory.java index 888c480595d..38e440bd2a6 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsUserAggregateFactory.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsUserAggregateFactory.java @@ -18,7 +18,6 @@ package org.apache.hadoop.hbase.regionserver; - import org.apache.hadoop.conf.Configuration; import org.apache.yetus.audience.InterfaceAudience; @@ -27,6 +26,7 @@ public class MetricsUserAggregateFactory { private MetricsUserAggregateFactory() { } + public static final String METRIC_USER_ENABLED_CONF = "hbase.regionserver.user.metrics.enabled"; public static final boolean DEFAULT_METRIC_USER_ENABLED_CONF = true; @@ -36,6 +36,10 @@ public class MetricsUserAggregateFactory { } else { //NoOpMetricUserAggregate return new MetricsUserAggregate() { + @Override public MetricsUserAggregateSource getSource() { + return null; + } + @Override public void updatePut(long t) { } @@ -63,6 +67,14 @@ public class MetricsUserAggregateFactory { @Override public void updateScanTime(long t) { } + + @Override public void updateFilteredReadRequests() { + + } + + @Override public void updateReadRequestCount() { + + } }; } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsUserAggregateImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsUserAggregateImpl.java index 6c24afc6ba3..b457c75affe 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsUserAggregateImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsUserAggregateImpl.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.regionserver; import java.io.IOException; +import java.net.InetAddress; import java.util.Optional; import org.apache.hadoop.conf.Configuration; @@ -29,8 +30,6 @@ import org.apache.hadoop.hbase.security.UserProvider; import org.apache.hadoop.hbase.util.LossyCounting; import org.apache.yetus.audience.InterfaceAudience; -import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; - @InterfaceAudience.Private public class MetricsUserAggregateImpl implements MetricsUserAggregate{ @@ -65,8 +64,8 @@ public class MetricsUserAggregateImpl implements MetricsUserAggregate{ return user.isPresent() ? user.get().getShortName() : null; } - @VisibleForTesting - MetricsUserAggregateSource getSource() { + @Override + public MetricsUserAggregateSource getSource() { return source; } @@ -74,7 +73,39 @@ public class MetricsUserAggregateImpl implements MetricsUserAggregate{ public void updatePut(long t) { String user = getActiveUser(); if (user != null) { - getOrCreateMetricsUser(user).updatePut(t); + MetricsUserSource userSource = getOrCreateMetricsUser(user); + userSource.updatePut(t); + incrementClientWriteMetrics(userSource); + } + + } + + private String getClient() { + Optional ipOptional = RpcServer.getRemoteAddress(); + if (ipOptional.isPresent()) { + return ipOptional.get().getHostName(); + } + return null; + } + + private void incrementClientReadMetrics(MetricsUserSource userSource) { + String client = getClient(); + if (client != null && userSource != null) { + userSource.getOrCreateMetricsClient(client).incrementReadRequest(); + } + } + + private void incrementFilteredReadRequests(MetricsUserSource userSource) { + String client = getClient(); + if (client != null && userSource != null) { + userSource.getOrCreateMetricsClient(client).incrementFilteredReadRequests(); + } + } + + private void incrementClientWriteMetrics(MetricsUserSource userSource) { + String client = getClient(); + if (client != null && userSource != null) { + userSource.getOrCreateMetricsClient(client).incrementWriteRequest(); } } @@ -82,7 +113,9 @@ public class MetricsUserAggregateImpl implements MetricsUserAggregate{ public void updateDelete(long t) { String user = getActiveUser(); if (user != null) { - getOrCreateMetricsUser(user).updateDelete(t); + MetricsUserSource userSource = getOrCreateMetricsUser(user); + userSource.updateDelete(t); + incrementClientWriteMetrics(userSource); } } @@ -90,7 +123,8 @@ public class MetricsUserAggregateImpl implements MetricsUserAggregate{ public void updateGet(long t) { String user = getActiveUser(); if (user != null) { - getOrCreateMetricsUser(user).updateGet(t); + MetricsUserSource userSource = getOrCreateMetricsUser(user); + userSource.updateGet(t); } } @@ -98,7 +132,9 @@ public class MetricsUserAggregateImpl implements MetricsUserAggregate{ public void updateIncrement(long t) { String user = getActiveUser(); if (user != null) { - getOrCreateMetricsUser(user).updateIncrement(t); + MetricsUserSource userSource = getOrCreateMetricsUser(user); + userSource.updateIncrement(t); + incrementClientWriteMetrics(userSource); } } @@ -106,7 +142,9 @@ public class MetricsUserAggregateImpl implements MetricsUserAggregate{ public void updateAppend(long t) { String user = getActiveUser(); if (user != null) { - getOrCreateMetricsUser(user).updateAppend(t); + MetricsUserSource userSource = getOrCreateMetricsUser(user); + userSource.updateAppend(t); + incrementClientWriteMetrics(userSource); } } @@ -114,7 +152,9 @@ public class MetricsUserAggregateImpl implements MetricsUserAggregate{ public void updateReplay(long t) { String user = getActiveUser(); if (user != null) { - getOrCreateMetricsUser(user).updateReplay(t); + MetricsUserSource userSource = getOrCreateMetricsUser(user); + userSource.updateReplay(t); + incrementClientWriteMetrics(userSource); } } @@ -122,7 +162,24 @@ public class MetricsUserAggregateImpl implements MetricsUserAggregate{ public void updateScanTime(long t) { String user = getActiveUser(); if (user != null) { - getOrCreateMetricsUser(user).updateScanTime(t); + MetricsUserSource userSource = getOrCreateMetricsUser(user); + userSource.updateScanTime(t); + } + } + + @Override public void updateFilteredReadRequests() { + String user = getActiveUser(); + if (user != null) { + MetricsUserSource userSource = getOrCreateMetricsUser(user); + incrementFilteredReadRequests(userSource); + } + } + + @Override public void updateReadRequestCount() { + String user = getActiveUser(); + if (user != null) { + MetricsUserSource userSource = getOrCreateMetricsUser(user); + incrementClientReadMetrics(userSource); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestClientClusterMetrics.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestClientClusterMetrics.java index e6923516d4d..0f4a35b4cd9 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestClientClusterMetrics.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestClientClusterMetrics.java @@ -18,28 +18,39 @@ package org.apache.hadoop.hbase; import java.io.IOException; +import java.security.PrivilegedAction; import java.util.EnumSet; import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicInteger; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.ClusterMetrics.Option; import org.apache.hadoop.hbase.Waiter.Predicate; import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.AsyncAdmin; import org.apache.hadoop.hbase.client.AsyncConnection; +import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.RegionInfoBuilder; import org.apache.hadoop.hbase.client.RegionStatesCount; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; import org.apache.hadoop.hbase.coprocessor.MasterCoprocessor; import org.apache.hadoop.hbase.coprocessor.MasterCoprocessorEnvironment; import org.apache.hadoop.hbase.coprocessor.MasterObserver; import org.apache.hadoop.hbase.coprocessor.ObserverContext; +import org.apache.hadoop.hbase.filter.FilterAllFilter; import org.apache.hadoop.hbase.master.HMaster; import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.security.User; +import org.apache.hadoop.hbase.security.UserProvider; import org.apache.hadoop.hbase.testclassification.SmallTests; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.JVMClusterUtil.MasterThread; @@ -238,6 +249,137 @@ public class TestClientClusterMetrics { Assert.assertEquals(MASTERS - 1, metrics.getBackupMasterNames().size()); } + @Test public void testUserMetrics() throws Exception { + Configuration conf = UTIL.getConfiguration(); + User userFoo = User.createUserForTesting(conf, "FOO_USER_METRIC_TEST", new String[0]); + User userBar = User.createUserForTesting(conf, "BAR_USER_METRIC_TEST", new String[0]); + User userTest = User.createUserForTesting(conf, "TEST_USER_METRIC_TEST", new String[0]); + UTIL.createTable(TABLE_NAME, CF); + waitForUsersMetrics(0); + long writeMetaMetricBeforeNextuser = getMetaMetrics().getWriteRequestCount(); + userFoo.runAs(new PrivilegedAction() { + @Override public Void run() { + try { + doPut(); + } catch (IOException e) { + Assert.fail("Exception:" + e.getMessage()); + } + return null; + } + }); + waitForUsersMetrics(1); + long writeMetaMetricForUserFoo = + getMetaMetrics().getWriteRequestCount() - writeMetaMetricBeforeNextuser; + long readMetaMetricBeforeNextuser = getMetaMetrics().getReadRequestCount(); + userBar.runAs(new PrivilegedAction() { + @Override public Void run() { + try { + doGet(); + } catch (IOException e) { + Assert.fail("Exception:" + e.getMessage()); + } + return null; + } + }); + waitForUsersMetrics(2); + long readMetaMetricForUserBar = + getMetaMetrics().getReadRequestCount() - readMetaMetricBeforeNextuser; + long filteredMetaReqeust = getMetaMetrics().getFilteredReadRequestCount(); + userTest.runAs(new PrivilegedAction() { + @Override public Void run() { + try { + Table table = createConnection(UTIL.getConfiguration()).getTable(TABLE_NAME); + for (Result result : table.getScanner(new Scan().setFilter(new FilterAllFilter()))) { + Assert.fail("Should have filtered all rows"); + } + } catch (IOException e) { + Assert.fail("Exception:" + e.getMessage()); + } + return null; + } + }); + waitForUsersMetrics(3); + long filteredMetaReqeustForTestUser = + getMetaMetrics().getFilteredReadRequestCount() - filteredMetaReqeust; + Map userMap = + ADMIN.getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS)).getLiveServerMetrics().values() + .iterator().next().getUserMetrics(); + for (byte[] user : userMap.keySet()) { + switch (Bytes.toString(user)) { + case "FOO_USER_METRIC_TEST": + Assert.assertEquals(1, + userMap.get(user).getWriteRequestCount() - writeMetaMetricForUserFoo); + break; + case "BAR_USER_METRIC_TEST": + Assert + .assertEquals(1, userMap.get(user).getReadRequestCount() - readMetaMetricForUserBar); + Assert.assertEquals(0, userMap.get(user).getWriteRequestCount()); + break; + case "TEST_USER_METRIC_TEST": + Assert.assertEquals(1, + userMap.get(user).getFilteredReadRequests() - filteredMetaReqeustForTestUser); + Assert.assertEquals(0, userMap.get(user).getWriteRequestCount()); + break; + default: + //current user + Assert.assertEquals(UserProvider.instantiate(conf).getCurrent().getName(), + Bytes.toString(user)); + //Read/write count because of Meta operations + Assert.assertTrue(userMap.get(user).getReadRequestCount() > 1); + break; + } + } + UTIL.deleteTable(TABLE_NAME); + } + + private RegionMetrics getMetaMetrics() throws IOException { + for (ServerMetrics serverMetrics : ADMIN.getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS)) + .getLiveServerMetrics().values()) { + RegionMetrics metaMetrics = serverMetrics.getRegionMetrics() + .get(RegionInfoBuilder.FIRST_META_REGIONINFO.getRegionName()); + if (metaMetrics != null) { + return metaMetrics; + } + } + Assert.fail("Should have find meta metrics"); + return null; + } + + private void waitForUsersMetrics(int noOfUsers) throws Exception { + //Sleep for metrics to get updated on master + Thread.sleep(5000); + Waiter.waitFor(CLUSTER.getConfiguration(), 10 * 1000, 100, new Predicate() { + @Override public boolean evaluate() throws Exception { + Map metrics = + ADMIN.getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS)).getLiveServerMetrics().values() + .iterator().next().getUserMetrics(); + Assert.assertNotNull(metrics); + //including current user + noOfUsers + return metrics.keySet().size() > noOfUsers; + } + }); + } + + private void doPut() throws IOException { + try (Connection conn = createConnection(UTIL.getConfiguration())) { + Table table = conn.getTable(TABLE_NAME); + table + .put(new Put(Bytes.toBytes("a")).addColumn(CF, Bytes.toBytes("col1"), Bytes.toBytes("1"))); + } + } + + private void doGet() throws IOException { + try (Connection conn = createConnection(UTIL.getConfiguration())) { + Table table = conn.getTable(TABLE_NAME); + table.get(new Get(Bytes.toBytes("a")).addColumn(CF, Bytes.toBytes("col1"))); + } + } + + private Connection createConnection(Configuration conf) throws IOException { + User user = UserProvider.instantiate(conf).getCurrent(); + return ConnectionFactory.createConnection(conf, user); + } + @Test public void testOtherStatusInfos() throws Exception { EnumSet