HBASE-7262 Move HBaseRPC metrics to metrics2

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1419962 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
eclark 2012-12-11 01:12:46 +00:00
parent 5184c87734
commit a23ecdae14
33 changed files with 1102 additions and 531 deletions

View File

@ -0,0 +1,72 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.ipc;
import org.apache.hadoop.hbase.metrics.BaseSource;
public interface MetricsHBaseServerSource extends BaseSource {
public static final String AUTHORIZATION_SUCCESSES_NAME = "authorizationSuccesses";
public static final String AUTHORIZATION_SUCCESSES_DESC =
"Number of authorization successes.";
public static final String AUTHORIZATION_FAILURES_NAME = "authorizationFailures";
public static final String AUTHORIZATION_FAILURES_DESC =
"Number of authorization failures.";
public static final String AUTHENTICATION_SUCCESSES_NAME = "authenticationSuccesses";
public static final String AUTHENTICATION_SUCCESSES_DESC =
"Number of authentication successes.";
public static final String AUTHENTICATION_FAILURES_NAME = "authenticationFailures";
public static final String AUTHENTICATION_FAILURES_DESC =
"Number of authentication failures.";
public static final String SENT_BYTES_NAME = "sentBytes";
public static final String SENT_BYTES_DESC = "Number of bytes sent.";
public static final String RECEIVED_BYTES_NAME = "receivedBytes";
public static final String RECEIVED_BYTES_DESC = "Number of bytes received.";
public static final String QUEUE_CALL_TIME_NAME = "queueCallTime";
public static final String QUEUE_CALL_TIME_DESC = "Queue Call Time.";
public static final String PROCESS_CALL_TIME_NAME = "processCallTime";
public static final String PROCESS_CALL_TIME_DESC = "Processing call time.";
public static final String QUEUE_SIZE_NAME = "queueSize";
public static final String QUEUE_SIZE_DESC = "Number of bytes in the call queues.";
public static final String GENERAL_QUEUE_NAME = "numCallsInGeneralQueue";
public static final String GENERAL_QUEUE_DESC = "Number of calls in the general call queue.";
public static final String PRIORITY_QUEUE_NAME = "numCallsInPriorityQueue";
public static final String REPLICATION_QUEUE_NAME = "numCallsInReplicationQueue";
public static final String REPLICATION_QUEUE_DESC =
"Number of calls in the replication call queue.";
public static final String PRIORITY_QUEUE_DESC = "Number of calls in the priority call queue.";
public static final String NUM_OPEN_CONNECTIONS_NAME = "numOpenConnections";
public static final String NUM_OPEN_CONNECTIONS_DESC = "Number of open connections.";
void authorizationSuccess();
void authorizationFailure();
void authenticationSuccess();
void authenticationFailure();
void sentBytes(int count);
void receivedBytes(int count);
void dequeuedCall(int qTime);
void processedCall(int processingTime);
}

View File

@ -0,0 +1,57 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.ipc;
public abstract class MetricsHBaseServerSourceFactory {
/**
* The name of the metrics
*/
static final String METRICS_NAME = "IPC";
/**
* Description
*/
static final String METRICS_DESCRIPTION = "Metrics about HBase Server IPC";
/**
* The Suffix of the JMX Context that a MetricsHBaseServerSource will register under.
*
* JMX_CONTEXT will be created by createContextName(serverClassName) + METRICS_JMX_CONTEXT_SUFFIX
*/
static final String METRICS_JMX_CONTEXT_SUFFIX = ",sub=" + METRICS_NAME;
abstract MetricsHBaseServerSource create(String serverName, MetricsHBaseServerWrapper wrapper);
/**
* From the name of the class that's starting up create the
* context that an IPC source should register itself.
*
* @param serverName The name of the class that's starting up.
* @return The Camel Cased context name.
*/
protected static String createContextName(String serverName) {
if (serverName.contains("HMaster")) {
return "Master";
} else if (serverName.contains("HRegion")) {
return "RegionServer";
}
return "IPC";
}
}

View File

@ -0,0 +1,28 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.ipc;
public interface MetricsHBaseServerWrapper {
long getTotalQueueSize();
int getGeneralQueueLength();
int getReplicationQueueLength();
int getPriorityQueueLength();
int getNumOpenConnections();
}

View File

@ -87,4 +87,29 @@ public interface BaseSource {
*/
void updateQuantile(String name, long value);
/**
* Get the metrics context. For hadoop metrics2 system this is usually an all lowercased string.
* eg. regionserver, master, thriftserver
*
* @return The string context used to register this source to hadoop's metrics2 system.
*/
String getMetricsContext();
/**
* Get the description of what this source exposes.
*/
String getMetricsDescription();
/**
* Get the name of the context in JMX that this source will be exposed through.
* This is in ObjectName format. With the default context being Hadoop -> HBase
*/
String getMetricsJmxContext();
/**
* Get the name of the metrics that are being exported by this source.
* Eg. IPC, GC, WAL
*/
String getMetricsName();
}

View File

@ -80,6 +80,31 @@ public interface MetricsRegionServerSource extends BaseSource {
*/
void updateAppend(long t);
/**
* Increment the number of slow Puts that have happened.
*/
void incrSlowPut();
/**
* Increment the number of slow Deletes that have happened.
*/
void incrSlowDelete();
/**
* Increment the number of slow Gets that have happened.
*/
void incrSlowGet();
/**
* Increment the number of slow Increments that have happened.
*/
void incrSlowIncrement();
/**
* Increment the number of slow Appends that have happened.
*/
void incrSlowAppend();
// Strings used for exporting to metrics system.
static final String REGION_COUNT = "regionCount";
static final String REGION_COUNT_DESC = "Number of regions";
@ -161,6 +186,22 @@ public interface MetricsRegionServerSource extends BaseSource {
static final String DELETE_KEY = "delete";
static final String GET_KEY = "get";
static final String INCREMENT_KEY = "increment";
static final String PUT_KEY = "multiput";
static final String MUTATE_KEY = "mutate";
static final String APPEND_KEY = "append";
static final String SLOW_MUTATE_KEY = "slowPutCount";
static final String SLOW_GET_KEY = "slowGetCount";
static final String SLOW_DELETE_KEY = "slowDeleteCount";
static final String SLOW_INCREMENT_KEY = "slowIncrementCount";
static final String SLOW_APPEND_KEY = "slowAppendCount";
static final String SLOW_MUTATE_DESC =
"The number of Multis that took over 1000ms to complete";
static final String SLOW_DELETE_DESC =
"The number of Deletes that took over 1000ms to complete";
static final String SLOW_GET_DESC = "The number of Gets that took over 1000ms to complete";
static final String SLOW_INCREMENT_DESC =
"The number of Increments that took over 1000ms to complete";
static final String SLOW_APPEND_DESC =
"The number of Appends that took over 1000ms to complete";
}

View File

@ -0,0 +1,59 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.ipc;
import java.util.HashMap;
public class MetricsHBaseServerSourceFactoryImpl extends MetricsHBaseServerSourceFactory {
private static enum SourceStorage {
INSTANCE;
HashMap<String, MetricsHBaseServerSource>
sources =
new HashMap<String, MetricsHBaseServerSource>();
}
@Override
public MetricsHBaseServerSource create(String serverName, MetricsHBaseServerWrapper wrapper) {
return getSource(serverName, wrapper);
}
private static synchronized MetricsHBaseServerSource getSource(String serverName,
MetricsHBaseServerWrapper wrapper) {
String context = createContextName(serverName);
MetricsHBaseServerSource source = SourceStorage.INSTANCE.sources.get(context);
if (source == null) {
//Create the source.
source = new MetricsHBaseServerSourceImpl(
METRICS_NAME,
METRICS_DESCRIPTION,
context.toLowerCase(),
context + METRICS_JMX_CONTEXT_SUFFIX, wrapper);
//Store back in storage
SourceStorage.INSTANCE.sources.put(context, source);
}
return source;
}
}

View File

@ -0,0 +1,124 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.ipc;
import org.apache.hadoop.hbase.metrics.BaseSourceImpl;
import org.apache.hadoop.metrics2.MetricsBuilder;
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
import org.apache.hadoop.metrics2.lib.MetricMutableCounterLong;
import org.apache.hadoop.metrics2.lib.MetricMutableHistogram;
public class MetricsHBaseServerSourceImpl extends BaseSourceImpl
implements MetricsHBaseServerSource {
private final MetricsHBaseServerWrapper wrapper;
private final MetricMutableCounterLong authorizationSuccesses;
private final MetricMutableCounterLong authorizationFailures;
private final MetricMutableCounterLong authenticationSuccesses;
private final MetricMutableCounterLong authenticationFailures;
private final MetricMutableCounterLong sentBytes;
private final MetricMutableCounterLong receivedBytes;
private MetricMutableHistogram queueCallTime;
private MetricMutableHistogram processCallTime;
public MetricsHBaseServerSourceImpl(String metricsName,
String metricsDescription,
String metricsContext,
String metricsJmxContext,
MetricsHBaseServerWrapper wrapper) {
super(metricsName, metricsDescription, metricsContext, metricsJmxContext);
this.wrapper = wrapper;
this.authorizationSuccesses = this.getMetricsRegistry().newCounter(AUTHORIZATION_SUCCESSES_NAME,
AUTHORIZATION_SUCCESSES_DESC, 0l);
this.authorizationFailures = this.getMetricsRegistry().newCounter(AUTHORIZATION_FAILURES_NAME,
AUTHORIZATION_FAILURES_DESC, 0l);
this.authenticationSuccesses = this.getMetricsRegistry().newCounter(
AUTHENTICATION_SUCCESSES_NAME, AUTHENTICATION_SUCCESSES_DESC, 0l);
this.authenticationFailures = this.getMetricsRegistry().newCounter(AUTHENTICATION_FAILURES_NAME,
AUTHENTICATION_FAILURES_DESC, 0l);
this.sentBytes = this.getMetricsRegistry().newCounter(SENT_BYTES_NAME,
SENT_BYTES_DESC, 0l);
this.receivedBytes = this.getMetricsRegistry().newCounter(RECEIVED_BYTES_NAME,
RECEIVED_BYTES_DESC, 0l);
this.queueCallTime = this.getMetricsRegistry().newHistogram(QUEUE_CALL_TIME_NAME,
QUEUE_CALL_TIME_DESC);
this.processCallTime = this.getMetricsRegistry().newHistogram(PROCESS_CALL_TIME_NAME,
PROCESS_CALL_TIME_DESC);
}
@Override
public void authorizationSuccess() {
authorizationSuccesses.incr();
}
@Override
public void authorizationFailure() {
authorizationFailures.incr();
}
@Override
public void authenticationFailure() {
authenticationFailures.incr();
}
@Override
public void authenticationSuccess() {
authenticationSuccesses.incr();
}
@Override
public void sentBytes(int count) {
this.sentBytes.incr(count);
}
@Override
public void receivedBytes(int count) {
this.receivedBytes.incr(count);
}
@Override
public void dequeuedCall(int qTime) {
queueCallTime.add(qTime);
}
@Override
public void processedCall(int processingTime) {
processCallTime.add(processingTime);
}
@Override
public void getMetrics(MetricsBuilder metricsBuilder, boolean all) {
MetricsRecordBuilder mrb = metricsBuilder.addRecord(metricsName)
.setContext(metricsContext);
if (wrapper != null) {
mrb.addGauge(QUEUE_SIZE_NAME, QUEUE_SIZE_DESC, wrapper.getTotalQueueSize())
.addGauge(GENERAL_QUEUE_NAME, GENERAL_QUEUE_DESC, wrapper.getGeneralQueueLength())
.addGauge(REPLICATION_QUEUE_NAME,
REPLICATION_QUEUE_DESC, wrapper.getReplicationQueueLength())
.addGauge(PRIORITY_QUEUE_NAME, PRIORITY_QUEUE_DESC, wrapper.getPriorityQueueLength())
.addGauge(NUM_OPEN_CONNECTIONS_NAME,
NUM_OPEN_CONNECTIONS_DESC, wrapper.getNumOpenConnections());
}
metricsRegistry.snapshot(mrb, all);
}
}

View File

@ -165,4 +165,20 @@ public class BaseSourceImpl implements BaseSource, MetricsSource {
public DynamicMetricsRegistry getMetricsRegistry() {
return metricsRegistry;
}
public String getMetricsContext() {
return metricsContext;
}
public String getMetricsDescription() {
return metricsDescription;
}
public String getMetricsJmxContext() {
return metricsJmxContext;
}
public String getMetricsName() {
return metricsName;
}
}

View File

@ -22,6 +22,7 @@ import org.apache.hadoop.hbase.metrics.BaseSourceImpl;
import org.apache.hadoop.metrics2.MetricHistogram;
import org.apache.hadoop.metrics2.MetricsBuilder;
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
import org.apache.hadoop.metrics2.lib.MetricMutableCounterLong;
/**
* Hadoop1 implementation of MetricsRegionServerSource.
@ -37,6 +38,11 @@ public class MetricsRegionServerSourceImpl
private final MetricHistogram getHisto;
private final MetricHistogram incrementHisto;
private final MetricHistogram appendHisto;
private final MetricMutableCounterLong slowPut;
private final MetricMutableCounterLong slowDelete;
private final MetricMutableCounterLong slowGet;
private final MetricMutableCounterLong slowIncrement;
private final MetricMutableCounterLong slowAppend;
public MetricsRegionServerSourceImpl(MetricsRegionServerWrapper rsWrap) {
this(METRICS_NAME, METRICS_DESCRIPTION, METRICS_CONTEXT, METRICS_JMX_CONTEXT, rsWrap);
@ -50,16 +56,20 @@ public class MetricsRegionServerSourceImpl
super(metricsName, metricsDescription, metricsContext, metricsJmxContext);
this.rsWrap = rsWrap;
putHisto = getMetricsRegistry().getHistogram(PUT_KEY);
deleteHisto = getMetricsRegistry().getHistogram(DELETE_KEY);
getHisto = getMetricsRegistry().getHistogram(GET_KEY);
incrementHisto = getMetricsRegistry().getHistogram(INCREMENT_KEY);
appendHisto = getMetricsRegistry().getHistogram(APPEND_KEY);
}
putHisto = getMetricsRegistry().newHistogram(MUTATE_KEY);
slowPut = getMetricsRegistry().newCounter(SLOW_MUTATE_KEY, SLOW_MUTATE_DESC, 0l);
@Override
public void init() {
super.init();
deleteHisto = getMetricsRegistry().newHistogram(DELETE_KEY);
slowDelete = getMetricsRegistry().newCounter(SLOW_DELETE_KEY, SLOW_DELETE_DESC, 0l);
getHisto = getMetricsRegistry().newHistogram(GET_KEY);
slowGet = getMetricsRegistry().newCounter(SLOW_GET_KEY, SLOW_GET_DESC, 0l);
incrementHisto = getMetricsRegistry().newHistogram(INCREMENT_KEY);
slowIncrement = getMetricsRegistry().newCounter(SLOW_INCREMENT_KEY, SLOW_INCREMENT_DESC, 0l);
appendHisto = getMetricsRegistry().newHistogram(APPEND_KEY);
slowAppend = getMetricsRegistry().newCounter(SLOW_APPEND_KEY, SLOW_APPEND_DESC, 0l);
}
@Override
@ -87,6 +97,31 @@ public class MetricsRegionServerSourceImpl
appendHisto.add(t);
}
@Override
public void incrSlowPut() {
slowPut.incr();
}
@Override
public void incrSlowDelete() {
slowDelete.incr();
}
@Override
public void incrSlowGet() {
slowGet.incr();
}
@Override
public void incrSlowIncrement() {
slowIncrement.incr();
}
@Override
public void incrSlowAppend() {
slowAppend.incr();
}
/**
* Yes this is a get function that doesn't return anything. Thanks Hadoop for breaking all
* expectations of java programmers. Instead of returning anything Hadoop metrics expects

View File

@ -64,7 +64,7 @@ public class MetricsRegionSourceImpl implements MetricsRegionSource {
String suffix = "Count";
regionPutKey = regionNamePrefix + MetricsRegionServerSource.PUT_KEY + suffix;
regionPutKey = regionNamePrefix + MetricsRegionServerSource.MUTATE_KEY + suffix;
regionPut = registry.getLongCounter(regionPutKey, 0l);
regionDeleteKey = regionNamePrefix + MetricsRegionServerSource.DELETE_KEY + suffix;

View File

@ -0,0 +1 @@
org.apache.hadoop.hbase.ipc.MetricsHBaseServerSourceFactoryImpl

View File

@ -23,6 +23,7 @@ import org.apache.hadoop.hbase.metrics.BaseSourceImpl;
import org.apache.hadoop.metrics2.Metric;
import org.apache.hadoop.metrics2.MetricsBuilder;
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
import org.apache.hadoop.metrics2.MetricsSource;
import org.apache.hadoop.metrics2.MetricsTag;
import java.util.HashMap;
@ -174,7 +175,7 @@ public class MetricsAssertHelperImpl implements MetricsAssertHelper {
public long getCounter(String name, BaseSource source) {
getMetrics(source);
String cName = canonicalizeMetricName(name);
assertNotNull(counters.get(cName));
assertNotNull("Should get counter "+cName + " but did not",counters.get(cName));
return counters.get(cName).longValue();
}
@ -182,7 +183,7 @@ public class MetricsAssertHelperImpl implements MetricsAssertHelper {
public double getGaugeDouble(String name, BaseSource source) {
getMetrics(source);
String cName = canonicalizeMetricName(name);
assertNotNull(gauges.get(cName));
assertNotNull("Should get gauge "+cName + " but did not",gauges.get(cName));
return gauges.get(cName).doubleValue();
}
@ -190,8 +191,8 @@ public class MetricsAssertHelperImpl implements MetricsAssertHelper {
public long getGaugeLong(String name, BaseSource source) {
getMetrics(source);
String cName = canonicalizeMetricName(name);
assertNotNull(gauges.get(cName));
return gauges.get(cName).longValue();
assertNotNull("Should get gauge " + cName + " but did not", gauges.get(cName));
return gauges.get(cName).longValue();
}
private void reset() {
@ -202,10 +203,10 @@ public class MetricsAssertHelperImpl implements MetricsAssertHelper {
private void getMetrics(BaseSource source) {
reset();
if (!(source instanceof BaseSourceImpl)) {
assertTrue(false);
if (!(source instanceof MetricsSource)) {
assertTrue("The Source passed must be a MetricsSource", false);
}
BaseSourceImpl impl = (BaseSourceImpl) source;
MetricsSource impl = (MetricsSource) source;
impl.getMetrics(new MockMetricsBuilder(), true);

View File

@ -0,0 +1,59 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.ipc;
import java.util.HashMap;
public class MetricsHBaseServerSourceFactoryImpl extends MetricsHBaseServerSourceFactory {
private static enum SourceStorage {
INSTANCE;
HashMap<String, MetricsHBaseServerSource>
sources =
new HashMap<String, MetricsHBaseServerSource>();
}
@Override
public MetricsHBaseServerSource create(String serverName, MetricsHBaseServerWrapper wrapper) {
return getSource(serverName, wrapper);
}
private static synchronized MetricsHBaseServerSource getSource(String serverName,
MetricsHBaseServerWrapper wrapper) {
String context = createContextName(serverName);
MetricsHBaseServerSource source = SourceStorage.INSTANCE.sources.get(context);
if (source == null) {
//Create the source.
source = new MetricsHBaseServerSourceImpl(
METRICS_NAME,
METRICS_DESCRIPTION,
context.toLowerCase(),
context + METRICS_JMX_CONTEXT_SUFFIX, wrapper);
//Store back in storage
SourceStorage.INSTANCE.sources.put(context, source);
}
return source;
}
}

View File

@ -0,0 +1,130 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.ipc;
import org.apache.hadoop.hbase.metrics.BaseSourceImpl;
import org.apache.hadoop.metrics2.MetricsCollector;
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
import org.apache.hadoop.metrics2.lib.Interns;
import org.apache.hadoop.metrics2.lib.MutableCounterLong;
import org.apache.hadoop.metrics2.lib.MutableHistogram;
public class MetricsHBaseServerSourceImpl extends BaseSourceImpl
implements MetricsHBaseServerSource {
private final MetricsHBaseServerWrapper wrapper;
private final MutableCounterLong authorizationSuccesses;
private final MutableCounterLong authorizationFailures;
private final MutableCounterLong authenticationSuccesses;
private final MutableCounterLong authenticationFailures;
private final MutableCounterLong sentBytes;
private final MutableCounterLong receivedBytes;
private MutableHistogram queueCallTime;
private MutableHistogram processCallTime;
public MetricsHBaseServerSourceImpl(String metricsName,
String metricsDescription,
String metricsContext,
String metricsJmxContext,
MetricsHBaseServerWrapper wrapper) {
super(metricsName, metricsDescription, metricsContext, metricsJmxContext);
this.wrapper = wrapper;
this.authorizationSuccesses = this.getMetricsRegistry().newCounter(AUTHORIZATION_SUCCESSES_NAME,
AUTHORIZATION_SUCCESSES_DESC, 0l);
this.authorizationFailures = this.getMetricsRegistry().newCounter(AUTHORIZATION_FAILURES_NAME,
AUTHORIZATION_FAILURES_DESC, 0l);
this.authenticationSuccesses = this.getMetricsRegistry().newCounter(
AUTHENTICATION_SUCCESSES_NAME, AUTHENTICATION_SUCCESSES_DESC, 0l);
this.authenticationFailures = this.getMetricsRegistry().newCounter(AUTHENTICATION_FAILURES_NAME,
AUTHENTICATION_FAILURES_DESC, 0l);
this.sentBytes = this.getMetricsRegistry().newCounter(SENT_BYTES_NAME,
SENT_BYTES_DESC, 0l);
this.receivedBytes = this.getMetricsRegistry().newCounter(RECEIVED_BYTES_NAME,
RECEIVED_BYTES_DESC, 0l);
this.queueCallTime = this.getMetricsRegistry().newHistogram(QUEUE_CALL_TIME_NAME,
QUEUE_CALL_TIME_DESC);
this.processCallTime = this.getMetricsRegistry().newHistogram(PROCESS_CALL_TIME_NAME,
PROCESS_CALL_TIME_DESC);
}
@Override
public void authorizationSuccess() {
authorizationSuccesses.incr();
}
@Override
public void authorizationFailure() {
authorizationFailures.incr();
}
@Override
public void authenticationFailure() {
authenticationFailures.incr();
}
@Override
public void authenticationSuccess() {
authenticationSuccesses.incr();
}
@Override
public void sentBytes(int count) {
this.sentBytes.incr(count);
}
@Override
public void receivedBytes(int count) {
this.receivedBytes.incr(count);
}
@Override
public void dequeuedCall(int qTime) {
queueCallTime.add(qTime);
}
@Override
public void processedCall(int processingTime) {
processCallTime.add(processingTime);
}
@Override
public void getMetrics(MetricsCollector metricsCollector, boolean all) {
metricsRegistry.snapshot(metricsCollector.addRecord(metricsRegistry.info()), all);
MetricsRecordBuilder mrb = metricsCollector.addRecord(metricsName)
.setContext(metricsContext);
if (wrapper != null) {
mrb.addGauge(Interns.info(QUEUE_SIZE_NAME, QUEUE_SIZE_DESC), wrapper.getTotalQueueSize())
.addGauge(Interns.info(GENERAL_QUEUE_NAME, GENERAL_QUEUE_DESC),
wrapper.getGeneralQueueLength())
.addGauge(Interns.info(REPLICATION_QUEUE_NAME,
REPLICATION_QUEUE_DESC), wrapper.getReplicationQueueLength())
.addGauge(Interns.info(PRIORITY_QUEUE_NAME, PRIORITY_QUEUE_DESC),
wrapper.getPriorityQueueLength())
.addGauge(Interns.info(NUM_OPEN_CONNECTIONS_NAME,
NUM_OPEN_CONNECTIONS_DESC), wrapper.getNumOpenConnections());
}
metricsRegistry.snapshot(mrb, all);
}
}

View File

@ -148,14 +148,29 @@ public class BaseSourceImpl implements BaseSource, MetricsSource {
JmxCacheBuster.clearJmxCache();
}
public DynamicMetricsRegistry getMetricsRegistry() {
return metricsRegistry;
}
@Override
public void getMetrics(MetricsCollector metricsCollector, boolean all) {
metricsRegistry.snapshot(metricsCollector.addRecord(metricsRegistry.info()), all);
}
public DynamicMetricsRegistry getMetricsRegistry() {
return metricsRegistry;
}
public String getMetricsContext() {
return metricsContext;
}
public String getMetricsDescription() {
return metricsDescription;
}
public String getMetricsJmxContext() {
return metricsJmxContext;
}
public String getMetricsName() {
return metricsName;
}
}

View File

@ -23,6 +23,7 @@ import org.apache.hadoop.metrics2.MetricHistogram;
import org.apache.hadoop.metrics2.MetricsCollector;
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
import org.apache.hadoop.metrics2.lib.Interns;
import org.apache.hadoop.metrics2.lib.MutableCounterLong;
/**
* Hadoop2 implementation of MetricsRegionServerSource.
@ -32,6 +33,8 @@ import org.apache.hadoop.metrics2.lib.Interns;
public class MetricsRegionServerSourceImpl
extends BaseSourceImpl implements MetricsRegionServerSource {
final MetricsRegionServerWrapper rsWrap;
private final MetricHistogram putHisto;
private final MetricHistogram deleteHisto;
@ -39,6 +42,13 @@ public class MetricsRegionServerSourceImpl
private final MetricHistogram incrementHisto;
private final MetricHistogram appendHisto;
private final MutableCounterLong slowPut;
private final MutableCounterLong slowDelete;
private final MutableCounterLong slowGet;
private final MutableCounterLong slowIncrement;
private final MutableCounterLong slowAppend;
public MetricsRegionServerSourceImpl(MetricsRegionServerWrapper rsWrap) {
this(METRICS_NAME, METRICS_DESCRIPTION, METRICS_CONTEXT, METRICS_JMX_CONTEXT, rsWrap);
}
@ -51,16 +61,20 @@ public class MetricsRegionServerSourceImpl
super(metricsName, metricsDescription, metricsContext, metricsJmxContext);
this.rsWrap = rsWrap;
putHisto = getMetricsRegistry().getHistogram(PUT_KEY);
deleteHisto = getMetricsRegistry().getHistogram(DELETE_KEY);
getHisto = getMetricsRegistry().getHistogram(GET_KEY);
incrementHisto = getMetricsRegistry().getHistogram(INCREMENT_KEY);
appendHisto = getMetricsRegistry().getHistogram(APPEND_KEY);
}
putHisto = getMetricsRegistry().newHistogram(MUTATE_KEY);
slowPut = getMetricsRegistry().newCounter(SLOW_MUTATE_KEY, SLOW_MUTATE_DESC, 0l);
@Override
public void init() {
super.init();
deleteHisto = getMetricsRegistry().newHistogram(DELETE_KEY);
slowDelete = getMetricsRegistry().newCounter(SLOW_DELETE_KEY, SLOW_DELETE_DESC, 0l);
getHisto = getMetricsRegistry().newHistogram(GET_KEY);
slowGet = getMetricsRegistry().newCounter(SLOW_GET_KEY, SLOW_GET_DESC, 0l);
incrementHisto = getMetricsRegistry().newHistogram(INCREMENT_KEY);
slowIncrement = getMetricsRegistry().newCounter(SLOW_INCREMENT_KEY, SLOW_INCREMENT_DESC, 0l);
appendHisto = getMetricsRegistry().newHistogram(APPEND_KEY);
slowAppend = getMetricsRegistry().newCounter(SLOW_APPEND_KEY, SLOW_APPEND_DESC, 0l);
}
@Override
@ -88,6 +102,31 @@ public class MetricsRegionServerSourceImpl
appendHisto.add(t);
}
@Override
public void incrSlowPut() {
slowPut.incr();
}
@Override
public void incrSlowDelete() {
slowDelete.incr();
}
@Override
public void incrSlowGet() {
slowGet.incr();
}
@Override
public void incrSlowIncrement() {
slowIncrement.incr();
}
@Override
public void incrSlowAppend() {
slowAppend.incr();
}
/**
* Yes this is a get function that doesn't return anything. Thanks Hadoop for breaking all
* expectations of java programmers. Instead of returning anything Hadoop metrics expects

View File

@ -65,7 +65,7 @@ public class MetricsRegionSourceImpl implements MetricsRegionSource {
String suffix = "Count";
regionPutKey = regionNamePrefix + MetricsRegionServerSource.PUT_KEY + suffix;
regionPutKey = regionNamePrefix + MetricsRegionServerSource.MUTATE_KEY + suffix;
regionPut = registry.getLongCounter(regionPutKey, 0l);
regionDeleteKey = regionNamePrefix + MetricsRegionServerSource.DELETE_KEY + suffix;

View File

@ -0,0 +1 @@
org.apache.hadoop.hbase.ipc.MetricsHBaseServerSourceFactoryImpl

View File

@ -24,6 +24,7 @@ import org.apache.hadoop.metrics2.AbstractMetric;
import org.apache.hadoop.metrics2.MetricsCollector;
import org.apache.hadoop.metrics2.MetricsInfo;
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
import org.apache.hadoop.metrics2.MetricsSource;
import org.apache.hadoop.metrics2.MetricsTag;
import java.util.HashMap;
@ -193,7 +194,7 @@ public class MetricsAssertHelperImpl implements MetricsAssertHelper {
public long getCounter(String name, BaseSource source) {
getMetrics(source);
String cName = canonicalizeMetricName(name);
assertNotNull(counters.get(cName));
assertNotNull("Should get counter "+cName + " but did not",counters.get(cName));
return counters.get(cName).longValue();
}
@ -201,7 +202,7 @@ public class MetricsAssertHelperImpl implements MetricsAssertHelper {
public double getGaugeDouble(String name, BaseSource source) {
getMetrics(source);
String cName = canonicalizeMetricName(name);
assertNotNull(gauges.get(cName));
assertNotNull("Should get gauge "+cName + " but did not",gauges.get(cName));
return gauges.get(cName).doubleValue();
}
@ -209,8 +210,8 @@ public class MetricsAssertHelperImpl implements MetricsAssertHelper {
public long getGaugeLong(String name, BaseSource source) {
getMetrics(source);
String cName = canonicalizeMetricName(name);
assertNotNull(gauges.get(cName));
return gauges.get(cName).longValue();
assertNotNull("Should get gauge " + cName + " but did not", gauges.get(cName));
return gauges.get(cName).longValue();
}
@ -222,10 +223,10 @@ public class MetricsAssertHelperImpl implements MetricsAssertHelper {
private void getMetrics(BaseSource source) {
reset();
if (!(source instanceof BaseSourceImpl)) {
assertTrue(false);
if (!(source instanceof MetricsSource)) {
assertTrue("The Source passed must be a MetricsSource", false);
}
BaseSourceImpl impl = (BaseSourceImpl) source;
MetricsSource impl = (MetricsSource) source;
impl.getMetrics(new MockMetricsBuilder(), true);

View File

@ -1,53 +0,0 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.ipc;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.metrics.util.MBeanUtil;
import org.apache.hadoop.metrics.util.MetricsDynamicMBeanBase;
import org.apache.hadoop.metrics.util.MetricsRegistry;
import javax.management.ObjectName;
/**
* Exports HBase RPC statistics recorded in {@link HBaseRpcMetrics} as an MBean
* for JMX monitoring.
*/
@InterfaceAudience.Private
public class HBaseRPCStatistics extends MetricsDynamicMBeanBase {
private final ObjectName mbeanName;
@SuppressWarnings({"UnusedDeclaration"})
public HBaseRPCStatistics(MetricsRegistry registry,
String hostName, String port) {
super(registry, "Metrics for RPC server instance");
String name = String.format("RPCStatistics-%s",
(port != null ? port : "unknown"));
mbeanName = MBeanUtil.registerMBean("HBase", name, this);
}
public void shutdown() {
if (mbeanName != null)
MBeanUtil.unregisterMBean(mbeanName);
}
}

View File

@ -1,228 +0,0 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.ipc;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.AdminProtocol;
import org.apache.hadoop.hbase.client.ClientProtocol;
import org.apache.hadoop.hbase.MasterMonitorProtocol;
import org.apache.hadoop.hbase.MasterAdminProtocol;
import org.apache.hadoop.hbase.ipc.VersionedProtocol;
import org.apache.hadoop.metrics.MetricsContext;
import org.apache.hadoop.metrics.MetricsRecord;
import org.apache.hadoop.metrics.MetricsUtil;
import org.apache.hadoop.metrics.Updater;
import org.apache.hadoop.metrics.util.*;
import org.apache.hadoop.hbase.RegionServerStatusProtocol;
import java.lang.reflect.Method;
/**
*
* This class is for maintaining the various RPC statistics
* and publishing them through the metrics interfaces.
* This also registers the JMX MBean for RPC.
* <p>
* This class has a number of metrics variables that are publicly accessible;
* these variables (objects) have methods to update their values;
* for example:
* <p> {@link #rpcQueueTime}.inc(time)
*
*/
@InterfaceAudience.Private
public class HBaseRpcMetrics implements Updater {
public static final String NAME_DELIM = "$";
private final MetricsRegistry registry = new MetricsRegistry();
private final MetricsRecord metricsRecord;
private static Log LOG = LogFactory.getLog(HBaseRpcMetrics.class);
private final HBaseRPCStatistics rpcStatistics;
public HBaseRpcMetrics(String hostName, String port) {
MetricsContext context = MetricsUtil.getContext("rpc");
metricsRecord = MetricsUtil.createRecord(context, "metrics");
metricsRecord.setTag("port", port);
LOG.info("Initializing RPC Metrics for className="
+ hostName + " on port=" + port);
context.registerUpdater(this);
initMethods(MasterMonitorProtocol.class);
initMethods(MasterAdminProtocol.class);
initMethods(RegionServerStatusProtocol.class);
initMethods(ClientProtocol.class);
initMethods(AdminProtocol.class);
rpcStatistics = new HBaseRPCStatistics(this.registry, hostName, port);
}
/**
* The metrics variables are public:
* - they can be set directly by calling their set/inc methods
* -they can also be read directly - e.g. JMX does this.
*/
public final MetricsTimeVaryingLong receivedBytes =
new MetricsTimeVaryingLong("ReceivedBytes", registry);
public final MetricsTimeVaryingLong sentBytes =
new MetricsTimeVaryingLong("SentBytes", registry);
public final MetricsTimeVaryingRate rpcQueueTime =
new MetricsTimeVaryingRate("RpcQueueTime", registry);
public MetricsTimeVaryingRate rpcProcessingTime =
new MetricsTimeVaryingRate("RpcProcessingTime", registry);
public final MetricsIntValue numOpenConnections =
new MetricsIntValue("NumOpenConnections", registry);
public final MetricsIntValue callQueueLen =
new MetricsIntValue("callQueueLen", registry);
public final MetricsIntValue priorityCallQueueLen =
new MetricsIntValue("priorityCallQueueLen", registry);
public final MetricsIntValue responseQueueLen =
new MetricsIntValue("responseQueueLen", registry);
public final MetricsTimeVaryingInt authenticationFailures =
new MetricsTimeVaryingInt("rpcAuthenticationFailures", registry);
public final MetricsTimeVaryingInt authenticationSuccesses =
new MetricsTimeVaryingInt("rpcAuthenticationSuccesses", registry);
public final MetricsTimeVaryingInt authorizationFailures =
new MetricsTimeVaryingInt("rpcAuthorizationFailures", registry);
public final MetricsTimeVaryingInt authorizationSuccesses =
new MetricsTimeVaryingInt("rpcAuthorizationSuccesses", registry);
public MetricsTimeVaryingRate rpcSlowResponseTime =
new MetricsTimeVaryingRate("RpcSlowResponse", registry);
public final MetricsIntValue replicationCallQueueLen =
new MetricsIntValue("replicationCallQueueLen", registry);
private void initMethods(Class<? extends VersionedProtocol> protocol) {
for (Method m : protocol.getDeclaredMethods()) {
if (get(m.getName()) == null)
create(m.getName());
}
}
private MetricsTimeVaryingRate get(String key) {
return (MetricsTimeVaryingRate) registry.get(key);
}
private MetricsTimeVaryingRate create(String key) {
return new MetricsTimeVaryingRate(key, this.registry);
}
public void inc(String name, int amt) {
MetricsTimeVaryingRate m = get(name);
if (m == null) {
LOG.warn("Got inc() request for method that doesnt exist: " +
name);
return; // ignore methods that dont exist.
}
m.inc(amt);
}
/**
* Generate metrics entries for all the methods defined in the list of
* interfaces. A {@link MetricsTimeVaryingRate} counter will be created for
* each {@code Class.getMethods().getName()} entry.
* @param ifaces Define metrics for all methods in the given classes
*/
public void createMetrics(Class<?>[] ifaces) {
createMetrics(ifaces, false);
}
/**
* Generate metrics entries for all the methods defined in the list of
* interfaces. A {@link MetricsTimeVaryingRate} counter will be created for
* each {@code Class.getMethods().getName()} entry.
*
* <p>
* If {@code prefixWithClass} is {@code true}, each metric will be named as
* {@code [Class.getSimpleName()].[Method.getName()]}. Otherwise each metric
* will just be named according to the method -- {@code Method.getName()}.
* </p>
* @param ifaces Define metrics for all methods in the given classes
* @param prefixWithClass If {@code true}, each metric will be named as
* "classname.method"
*/
public void createMetrics(Class<?>[] ifaces, boolean prefixWithClass) {
createMetrics(ifaces, prefixWithClass, null);
}
/**
* Generate metrics entries for all the methods defined in the list of
* interfaces. A {@link MetricsTimeVaryingRate} counter will be created for
* each {@code Class.getMethods().getName()} entry.
*
* <p>
* If {@code prefixWithClass} is {@code true}, each metric will be named as
* {@code [Class.getSimpleName()].[Method.getName()]}. Otherwise each metric
* will just be named according to the method -- {@code Method.getName()}.
* </p>
*
* <p>
* Additionally, if {@code suffixes} is defined, additional metrics will be
* created for each method named as the original metric concatenated with
* the suffix.
* </p>
* @param ifaces Define metrics for all methods in the given classes
* @param prefixWithClass If {@code true}, each metric will be named as
* "classname.method"
* @param suffixes If not null, each method will get additional metrics ending
* in each of the suffixes.
*/
public void createMetrics(Class<?>[] ifaces, boolean prefixWithClass,
String [] suffixes) {
for (Class<?> iface : ifaces) {
Method[] methods = iface.getMethods();
for (Method method : methods) {
String attrName = prefixWithClass ?
getMetricName(iface, method.getName()) : method.getName();
if (get(attrName) == null)
create(attrName);
if (suffixes != null) {
// create metrics for each requested suffix
for (String s : suffixes) {
String metricName = attrName + s;
if (get(metricName) == null)
create(metricName);
}
}
}
}
}
public static String getMetricName(Class<?> c, String method) {
return c.getSimpleName() + NAME_DELIM + method;
}
/**
* Push the metrics to the monitoring subsystem on doUpdate() call.
*/
public void doUpdates(final MetricsContext context) {
// Both getMetricsList() and pushMetric() are thread-safe
for (MetricsBase m : registry.getMetricsList()) {
m.pushMetric(metricsRecord);
}
metricsRecord.update();
}
public void shutdown() {
if (rpcStatistics != null)
rpcStatistics.shutdown();
}
}

View File

@ -245,7 +245,7 @@ public abstract class HBaseServer implements RpcServer {
// connections to nuke
// during a cleanup
protected HBaseRpcMetrics rpcMetrics;
protected MetricsHBaseServer metrics;
protected Configuration conf;
@ -275,7 +275,7 @@ public abstract class HBaseServer implements RpcServer {
private Handler[] handlers = null;
private Handler[] priorityHandlers = null;
/** replication related queue; */
private BlockingQueue<Call> replicationQueue;
protected BlockingQueue<Call> replicationQueue;
private int numOfReplicationHandlers = 0;
private Handler[] replicationHandlers = null;
@ -765,7 +765,6 @@ public abstract class HBaseServer implements RpcServer {
reader.finishAdd();
}
}
rpcMetrics.numOpenConnections.set(numConnections);
}
void doRead(SelectionKey key) throws InterruptedException {
@ -1304,7 +1303,7 @@ public abstract class HBaseServer implements RpcServer {
}
doRawSaslReply(SaslStatus.ERROR, null, sendToClient.getClass().getName(),
sendToClient.getLocalizedMessage());
rpcMetrics.authenticationFailures.inc();
metrics.authenticationFailure();
String clientIP = this.toString();
// attempting user could be null
AUDITLOG.warn(AUTH_FAILED_FOR + clientIP + ":" + attemptingUser);
@ -1326,7 +1325,7 @@ public abstract class HBaseServer implements RpcServer {
+ user + ". Negotiated QoP is "
+ saslServer.getNegotiatedProperty(Sasl.QOP));
}
rpcMetrics.authenticationSuccesses.inc();
metrics.authenticationSuccess();
AUDITLOG.info(AUTH_SUCCESSFUL_FOR + user);
saslContextEstablished = true;
}
@ -1653,14 +1652,11 @@ public abstract class HBaseServer implements RpcServer {
if (priorityCallQueue != null && getQosLevel(rpcRequestBody) > highPriorityLevel) {
priorityCallQueue.put(call);
updateCallQueueLenMetrics(priorityCallQueue);
} else if (replicationQueue != null
&& getQosLevel(rpcRequestBody) == HConstants.REPLICATION_QOS) {
replicationQueue.put(call);
updateCallQueueLenMetrics(replicationQueue);
} else {
callQueue.put(call); // queue the call; maybe blocked here
updateCallQueueLenMetrics(callQueue);
}
}
@ -1678,10 +1674,10 @@ public abstract class HBaseServer implements RpcServer {
if (LOG.isDebugEnabled()) {
LOG.debug("Successfully authorized " + header);
}
rpcMetrics.authorizationSuccesses.inc();
metrics.authorizationSuccess();
} catch (AuthorizationException ae) {
LOG.debug("Connection authorization failed: "+ae.getMessage(), ae);
rpcMetrics.authorizationFailures.inc();
metrics.authorizationFailure();
setupResponse(authFailedResponse, authFailedCall, Status.FATAL,
ae.getClass().getName(), ae.getMessage());
responder.doRespond(authFailedCall);
@ -1731,23 +1727,6 @@ public abstract class HBaseServer implements RpcServer {
}
}
/**
* Reports length of the call queue to HBaseRpcMetrics.
* @param queue Which queue to report
*/
private void updateCallQueueLenMetrics(BlockingQueue<Call> queue) {
if (queue == callQueue) {
rpcMetrics.callQueueLen.set(callQueue.size());
} else if (queue == priorityCallQueue) {
rpcMetrics.priorityCallQueueLen.set(priorityCallQueue.size());
} else if (queue == replicationQueue) {
rpcMetrics.replicationCallQueueLen.set(replicationQueue.size());
} else {
LOG.warn("Unknown call queue");
}
rpcMetrics.responseQueueLen.set(responseQueueLen);
}
/** Handles queued calls . */
private class Handler extends Thread {
private final BlockingQueue<Call> myCallQueue;
@ -1778,7 +1757,6 @@ public abstract class HBaseServer implements RpcServer {
try {
status.pause("Waiting for a call");
Call call = myCallQueue.take(); // pop the queue; maybe blocked here
updateCallQueueLenMetrics(myCallQueue);
status.setStatus("Setting up call");
status.setConnection(call.connection.getHostAddress(),
call.connection.getRemotePort());
@ -1936,8 +1914,9 @@ public abstract class HBaseServer implements RpcServer {
// Start the listener here and let it bind to the port
listener = new Listener();
this.port = listener.getAddress().getPort();
this.rpcMetrics = new HBaseRpcMetrics(
serverName, Integer.toString(this.port));
this.metrics = new MetricsHBaseServer(
serverName, new MetricsHBaseServerWrapperImpl(this));
this.tcpNoDelay = conf.getBoolean("ipc.server.tcpnodelay", true);
this.tcpKeepAlive = conf.getBoolean("ipc.server.tcpkeepalive", true);
@ -1970,7 +1949,6 @@ public abstract class HBaseServer implements RpcServer {
* @param response buffer to serialize the response into
* @param call {@link Call} to which we are setting up the response
* @param status {@link Status} of the IPC call
* @param rv return value for the IPC Call, if the call was successful
* @param errorClass error class, if the the call failed
* @param error error message, if the call failed
* @throws IOException
@ -1990,7 +1968,6 @@ public abstract class HBaseServer implements RpcServer {
}
}
connection.close();
rpcMetrics.numOpenConnections.set(numConnections);
}
Configuration getConf() {
@ -2063,9 +2040,6 @@ public abstract class HBaseServer implements RpcServer {
listener.doStop();
responder.interrupt();
notifyAll();
if (this.rpcMetrics != null) {
this.rpcMetrics.shutdown();
}
}
private void stopHandlers(Handler[] handlers) {
@ -2111,8 +2085,8 @@ public abstract class HBaseServer implements RpcServer {
/**
* Returns the metrics instance for reporting RPC call statistics
*/
public HBaseRpcMetrics getRpcMetrics() {
return rpcMetrics;
public MetricsHBaseServer getMetrics() {
return metrics;
}
/**
@ -2167,7 +2141,7 @@ public abstract class HBaseServer implements RpcServer {
int count = (buffer.remaining() <= NIO_BUFFER_LIMIT) ?
channel.write(buffer) : channelIO(null, channel, buffer);
if (count > 0) {
rpcMetrics.sentBytes.inc(count);
metrics.sentBytes(count);
}
return count;
}
@ -2190,8 +2164,8 @@ public abstract class HBaseServer implements RpcServer {
int count = (buffer.remaining() <= NIO_BUFFER_LIMIT) ?
channel.read(buffer) : channelIO(channel, null, buffer);
if (count > 0) {
rpcMetrics.receivedBytes.inc(count);
}
metrics.receivedBytes(count);
}
return count;
}

View File

@ -0,0 +1,72 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.ipc;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
@InterfaceAudience.Private
public class MetricsHBaseServer {
private static Log LOG = LogFactory.getLog(MetricsHBaseServer.class);
private MetricsHBaseServerSource source;
public MetricsHBaseServer(String serverName, MetricsHBaseServerWrapper wrapper) {
source = CompatibilitySingletonFactory.getInstance(MetricsHBaseServerSourceFactory.class)
.create(serverName, wrapper);
}
void authorizationSuccess() {
source.authorizationSuccess();
}
void authorizationFailure() {
source.authorizationFailure();
}
void authenticationFailure() {
source.authenticationFailure();
}
void authenticationSuccess() {
source.authenticationSuccess();
}
void sentBytes(int count) {
source.sentBytes(count);
}
void receivedBytes(int count) {
source.receivedBytes(count);
}
void dequeuedCall(int qTime) {
source.dequeuedCall(qTime);
}
void processedCall(int processingTime) {
source.processedCall(processingTime);
}
public MetricsHBaseServerSource getMetricsSource() {
return source;
}
}

View File

@ -0,0 +1,69 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.ipc;
public class MetricsHBaseServerWrapperImpl implements MetricsHBaseServerWrapper {
private HBaseServer server;
MetricsHBaseServerWrapperImpl(HBaseServer server) {
this.server = server;
}
@Override
public long getTotalQueueSize() {
if (this.server == null) {
return 0;
}
return server.callQueueSize.get();
}
@Override
public int getGeneralQueueLength() {
if (this.server == null || this.server.callQueue == null) {
return 0;
}
return server.callQueue.size();
}
@Override
public int getReplicationQueueLength() {
if (this.server == null || this.server.replicationQueue == null) {
return 0;
}
return server.replicationQueue.size();
}
@Override
public int getPriorityQueueLength() {
if (this.server == null || this.server.priorityCallQueue == null) {
return 0;
}
return server.priorityCallQueue.size();
}
@Override
public int getNumOpenConnections() {
if (this.server == null || this.server.connectionList == null) {
return 0;
}
return server.connectionList.size();
}
}

View File

@ -250,10 +250,6 @@ class ProtobufRpcEngine implements RpcEngine {
this.implementation = instance.getClass();
this.verbose = verbose;
// create metrics for the advertised interfaces this server implements.
String [] metricSuffixes = new String [] {ABOVE_ONE_SEC_METRIC};
this.rpcMetrics.createMetrics(ifaces, false, metricSuffixes);
this.warnResponseTime = conf.getInt(WARN_RESPONSE_TIME,
DEFAULT_WARN_RESPONSE_TIME);
this.warnResponseSize = conf.getInt(WARN_RESPONSE_SIZE,
@ -372,9 +368,9 @@ class ProtobufRpcEngine implements RpcEngine {
", request=" + param.toString() +
" response=" + result.toString());
}
rpcMetrics.rpcQueueTime.inc(qTime);
rpcMetrics.rpcProcessingTime.inc(processingTime);
rpcMetrics.inc(method.getName(), processingTime);
metrics.dequeuedCall(qTime);
metrics.processedCall(processingTime);
if (verbose) {
log("Return: "+result, LOG);
}
@ -398,17 +394,6 @@ class ProtobufRpcEngine implements RpcEngine {
methodName, buffer.toString(), (tooLarge ? "TooLarge" : "TooSlow"),
status.getClient(), startTime, processingTime, qTime,
responseSize);
// provides a count of log-reported slow responses
if (tooSlow) {
rpcMetrics.rpcSlowResponseTime.inc(processingTime);
}
}
if (processingTime > 1000) {
// we use a hard-coded one second period so that we can clearly
// indicate the time period we're warning about in the name of the
// metric itself
rpcMetrics.inc(method.getName() + ABOVE_ONE_SEC_METRIC,
processingTime);
}
return result;
} catch (InvocationTargetException e) {

View File

@ -23,7 +23,6 @@ import com.google.common.base.Function;
import com.google.protobuf.Message;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hbase.ipc.VersionedProtocol;
import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcRequestBody;
@ -67,5 +66,5 @@ public interface RpcServer {
/**
* Returns the metrics instance for reporting RPC call statistics
*/
HBaseRpcMetrics getRpcMetrics();
MetricsHBaseServer getMetrics();
}

View File

@ -19,7 +19,6 @@
package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
import java.io.StringWriter;
import java.lang.Thread.UncaughtExceptionHandler;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
@ -108,7 +107,7 @@ import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.ipc.CoprocessorProtocol;
import org.apache.hadoop.hbase.ipc.HBaseRPC;
import org.apache.hadoop.hbase.ipc.HBaseRPCErrorHandler;
import org.apache.hadoop.hbase.ipc.HBaseRpcMetrics;
import org.apache.hadoop.hbase.ipc.MetricsHBaseServer;
import org.apache.hadoop.hbase.ipc.ProtocolSignature;
import org.apache.hadoop.hbase.ipc.RpcServer;
import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
@ -216,7 +215,6 @@ import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.StringUtils;
import org.apache.zookeeper.KeeperException;
import org.cliffc.high_scale_lib.Counter;
import org.codehaus.jackson.map.ObjectMapper;
import com.google.common.base.Function;
import com.google.protobuf.ByteString;
@ -1622,14 +1620,6 @@ public class HRegionServer implements ClientProtocol,
}
/**
* Return a reference to the metrics instance used for counting RPC calls.
* @return Metrics instance.
*/
public HBaseRpcMetrics getRpcMetrics() {
return rpcServer.getRpcMetrics();
}
@Override
public RpcServer getRpcServer() {
return rpcServer;

View File

@ -54,22 +54,37 @@ public class MetricsRegionServer {
}
public void updatePut(long t){
if (t > 1000) {
serverSource.incrSlowPut();
}
serverSource.updatePut(t);
}
public void updateDelete(long t){
if (t > 1000) {
serverSource.incrSlowDelete();
}
serverSource.updateDelete(t);
}
public void updateGet(long t){
if (t > 1000) {
serverSource.incrSlowGet();
}
serverSource.updateGet(t);
}
public void updateIncrement(long t){
if (t > 1000) {
serverSource.incrSlowIncrement();
}
serverSource.updateIncrement(t);
}
public void updateAppend(long t){
if (t > 1000) {
serverSource.incrSlowAppend();
}
serverSource.updateAppend(t);
}
}

View File

@ -0,0 +1,28 @@
package org.apache.hadoop.hbase.ipc;
public class MetricsHBaseServerWrapperStub implements MetricsHBaseServerWrapper{
@Override
public long getTotalQueueSize() {
return 101;
}
@Override
public int getGeneralQueueLength() {
return 102;
}
@Override
public int getReplicationQueueLength() {
return 103;
}
@Override
public int getPriorityQueueLength() {
return 104;
}
@Override
public int getNumOpenConnections() {
return 105;
}
}

View File

@ -0,0 +1,115 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.ipc;
import org.apache.hadoop.hbase.CompatibilityFactory;
import org.apache.hadoop.hbase.SmallTests;
import org.apache.hadoop.hbase.test.MetricsAssertHelper;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import static org.junit.Assert.*;
@Category(SmallTests.class)
public class TestRpcMetrics {
public MetricsAssertHelper HELPER = CompatibilityFactory.getInstance(MetricsAssertHelper.class);
@Test
public void testFactory() {
MetricsHBaseServer masterMetrics = new MetricsHBaseServer("HMaster", new MetricsHBaseServerWrapperStub());
MetricsHBaseServerSource masterSource = masterMetrics.getMetricsSource();
MetricsHBaseServer rsMetrics = new MetricsHBaseServer("HRegionServer", new MetricsHBaseServerWrapperStub());
MetricsHBaseServerSource rsSource = rsMetrics.getMetricsSource();
assertEquals("master", masterSource.getMetricsContext());
assertEquals("regionserver", rsSource.getMetricsContext());
assertEquals("Master,sub=IPC", masterSource.getMetricsJmxContext());
assertEquals("RegionServer,sub=IPC", rsSource.getMetricsJmxContext());
assertEquals("IPC", masterSource.getMetricsName());
assertEquals("IPC", rsSource.getMetricsName());
}
/**
* This test makes sure that the numbers from a MetricsHBaseServerWrapper are correctly exported
* to hadoop metrics 2 system.
*/
@Test
public void testWrapperSource() {
MetricsHBaseServer mrpc = new MetricsHBaseServer("HMaster", new MetricsHBaseServerWrapperStub());
MetricsHBaseServerSource serverSource = mrpc.getMetricsSource();
HELPER.assertGauge("queueSize", 101, serverSource);
HELPER.assertGauge("numCallsInGeneralQueue", 102, serverSource);
HELPER.assertGauge("numCallsInReplicationQueue", 103, serverSource);
HELPER.assertGauge("numCallsInPriorityQueue", 104, serverSource);
HELPER.assertGauge("numOpenConnections", 105, serverSource);
}
/**
* Test to make sure that all the actively called method on MetricsHBaseServer work.
*/
@Test
public void testSourceMethods() {
MetricsHBaseServer mrpc = new MetricsHBaseServer("HMaster", new MetricsHBaseServerWrapperStub());
MetricsHBaseServerSource serverSource = mrpc.getMetricsSource();
for (int i=0; i < 12; i++) {
mrpc.authenticationFailure();
}
for (int i=0; i < 13; i++) {
mrpc.authenticationSuccess();
}
HELPER.assertCounter("authenticationFailures", 12, serverSource);
HELPER.assertCounter("authenticationSuccesses", 13, serverSource);
for (int i=0; i < 14; i++) {
mrpc.authorizationSuccess();
}
for (int i=0; i < 15; i++) {
mrpc.authorizationFailure();
}
HELPER.assertCounter("authorizationSuccesses", 14, serverSource);
HELPER.assertCounter("authorizationFailures", 15, serverSource);
mrpc.dequeuedCall(100);
mrpc.processedCall(101);
HELPER.assertCounter("queueCallTime_NumOps", 1, serverSource);
HELPER.assertCounter("processCallTime_NumOps", 1, serverSource);
mrpc.sentBytes(103);
mrpc.sentBytes(103);
mrpc.sentBytes(103);
mrpc.receivedBytes(104);
mrpc.receivedBytes(104);
HELPER.assertCounter("sentBytes", 309, serverSource);
HELPER.assertCounter("receivedBytes", 208, serverSource);
}
}

View File

@ -38,7 +38,6 @@ public class TestMetricsRegionServer {
@Test
public void testWrapperSource() {
MetricsRegionServer rsm = new MetricsRegionServer(new MetricsRegionServerWrapperStub());
MetricsRegionServerSource serverSource = rsm.getMetricsSource();
HELPER.assertTag("serverName", "test", serverSource);
@ -80,4 +79,44 @@ public class TestMetricsRegionServer {
assertNotNull("There should be a hadoop1/hadoop2 metrics source", rsm.getMetricsSource() );
assertNotNull("The RegionServerMetricsWrapper should be accessable", rsm.getRegionServerWrapper());
}
@Test
public void testSlowCount() {
MetricsRegionServer rsm = new MetricsRegionServer(new MetricsRegionServerWrapperStub());
MetricsRegionServerSource serverSource = rsm.getMetricsSource();
for (int i=0; i < 12; i ++) {
rsm.updateAppend(12);
rsm.updateAppend(1002);
}
for (int i=0; i < 13; i ++) {
rsm.updateDelete(13);
rsm.updateDelete(1003);
}
for (int i=0; i < 14; i ++) {
rsm.updateGet(14);
rsm.updateGet(1004);
}
for (int i=0; i < 15; i ++) {
rsm.updateIncrement(15);
rsm.updateIncrement(1005);
}
for (int i=0; i < 16; i ++) {
rsm.updatePut(16);
rsm.updatePut(1006);
}
HELPER.assertCounter("appendNumOps", 24, serverSource);
HELPER.assertCounter("deleteNumOps", 26, serverSource);
HELPER.assertCounter("getNumOps", 28, serverSource);
HELPER.assertCounter("incrementNumOps", 30, serverSource);
HELPER.assertCounter("mutateNumOps", 32, serverSource);
HELPER.assertCounter("slowAppendCount", 12, serverSource);
HELPER.assertCounter("slowDeleteCount", 13, serverSource);
HELPER.assertCounter("slowGetCount", 14, serverSource);
HELPER.assertCounter("slowIncrementCount", 15, serverSource);
HELPER.assertCounter("slowPutCount", 16, serverSource);
}
}

View File

@ -141,7 +141,7 @@ public class TestRegionServerMetrics {
.getAggregateSource();
String prefix = "table."+tableNameString + ".region." + i.getEncodedName();
metricsHelper.assertCounter(prefix + ".getCount", 10, agg);
metricsHelper.assertCounter(prefix + ".multiPutCount", 30, agg);
metricsHelper.assertCounter(prefix + ".mutateCount", 30, agg);
}

View File

@ -1,138 +0,0 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Random;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.MediumTests;
import org.apache.hadoop.hbase.ipc.HBaseRpcMetrics;
import org.apache.hadoop.metrics.ContextFactory;
import org.apache.hadoop.metrics.MetricsContext;
import org.apache.hadoop.metrics.MetricsUtil;
import org.apache.hadoop.metrics.spi.AbstractMetricsContext;
import org.apache.hadoop.metrics.spi.OutputRecord;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import static org.junit.Assert.*;
@Category(MediumTests.class)
public class TestRpcMetrics {
/**
* Defines test methods to register with HBaseRpcMetrics
*/
public interface TestMetrics {
public void test();
}
/**
* HRegionServer sub-class to register custom metrics
*/
public static class TestRegionServer extends HRegionServer {
public TestRegionServer(Configuration conf)
throws IOException, InterruptedException {
super(conf);
// register custom metrics interface
getRpcMetrics().createMetrics(new Class[]{TestMetrics.class}, true);
}
public void incTest(int amt) {
HBaseRpcMetrics metrics = getRpcMetrics();
// force an increment so we have something to check for
metrics.inc(metrics.getMetricName(TestMetrics.class, "test"), amt);
}
}
/**
* Dummy metrics context to allow retrieval of values
*/
public static class MockMetricsContext extends AbstractMetricsContext {
public MockMetricsContext() {
// update every 1 sec.
setPeriod(1);
}
@Override
protected void emitRecord(String contextName, String recordName,
OutputRecord outputRecord) throws IOException {
for (String name : outputRecord.getMetricNames()) {
Number val = outputRecord.getMetric(name);
if (val != null && val.intValue() > 0) {
METRICS.put(name, Boolean.TRUE);
LOG.debug("Set metric "+name+" to "+val);
}
}
}
}
private static Map<String,Boolean> METRICS = new HashMap<String,Boolean>();
private static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
private static Log LOG = LogFactory.getLog(TestRpcMetrics.class);
@BeforeClass
public static void setupBeforeClass() throws Exception {
// set custom metrics context
ContextFactory factory = ContextFactory.getFactory();
factory.setAttribute("rpc.class", MockMetricsContext.class.getName());
// make sure metrics context is setup, otherwise updating won't start
MetricsContext ctx = MetricsUtil.getContext("rpc");
assertTrue("Wrong MetricContext implementation class",
(ctx instanceof MockMetricsContext));
TEST_UTIL.startMiniZKCluster();
}
@AfterClass
public static void tearDownAfterClass() throws Exception {
TEST_UTIL.shutdownMiniZKCluster();
}
@Test
public void testCustomMetrics() throws Exception {
TEST_UTIL.getConfiguration().setInt("hbase.regionserver.port", 0);
TestRegionServer rs = new TestRegionServer(TEST_UTIL.getConfiguration());
rs.incTest(5);
// wait for metrics context update
Thread.sleep(1000);
String metricName = HBaseRpcMetrics.getMetricName(TestMetrics.class, "test");
assertTrue("Metric should have set incremented for "+metricName,
wasSet(metricName + "_num_ops"));
}
public boolean wasSet(String name) {
return METRICS.get(name) != null ? METRICS.get(name) : false;
}
}