HBASE-12911 Client-side metrics

This commit is contained in:
Nick Dimiduk 2015-10-05 10:19:40 -07:00
parent e1fd3526b1
commit 7e30436e3f
24 changed files with 673 additions and 116 deletions

View File

@ -189,6 +189,10 @@
<artifactId>log4j</artifactId> <artifactId>log4j</artifactId>
<scope>test</scope> <scope>test</scope>
</dependency> </dependency>
<dependency>
<groupId>com.yammer.metrics</groupId>
<artifactId>metrics-core</artifactId>
</dependency>
</dependencies> </dependencies>
<profiles> <profiles>

View File

@ -297,4 +297,9 @@ public interface ClusterConnection extends HConnection {
*/ */
ClientBackoffPolicy getBackoffPolicy(); ClientBackoffPolicy getBackoffPolicy();
/**
* @return the MetricsConnection instance associated with this connection.
*/
public MetricsConnection getConnectionMetrics();
} }

View File

@ -174,5 +174,4 @@ public interface Connection extends Abortable, Closeable {
* @return true if this connection is closed * @return true if this connection is closed
*/ */
boolean isClosed(); boolean isClosed();
} }

View File

@ -18,6 +18,8 @@
*/ */
package org.apache.hadoop.hbase.client; package org.apache.hadoop.hbase.client;
import static org.apache.hadoop.hbase.client.MetricsConnection.CLIENT_SIDE_METRICS_ENABLED_KEY;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.BlockingRpcChannel; import com.google.protobuf.BlockingRpcChannel;
import com.google.protobuf.RpcController; import com.google.protobuf.RpcController;
@ -165,11 +167,12 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
// Client rpc instance. // Client rpc instance.
private RpcClient rpcClient; private RpcClient rpcClient;
private MetaCache metaCache = new MetaCache(); private final MetaCache metaCache;
private final MetricsConnection metrics;
private int refCount; private int refCount;
private User user; protected User user;
private RpcRetryingCallerFactory rpcCallerFactory; private RpcRetryingCallerFactory rpcCallerFactory;
@ -219,6 +222,12 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
this.rpcCallerFactory = RpcRetryingCallerFactory.instantiate(conf, interceptor, this.stats); this.rpcCallerFactory = RpcRetryingCallerFactory.instantiate(conf, interceptor, this.stats);
this.backoffPolicy = ClientBackoffPolicyFactory.create(conf); this.backoffPolicy = ClientBackoffPolicyFactory.create(conf);
this.asyncProcess = createAsyncProcess(this.conf); this.asyncProcess = createAsyncProcess(this.conf);
if (conf.getBoolean(CLIENT_SIDE_METRICS_ENABLED_KEY, false)) {
this.metrics = new MetricsConnection(this);
} else {
this.metrics = null;
}
this.metaCache = new MetaCache(this.metrics);
boolean shouldListen = conf.getBoolean(HConstants.STATUS_PUBLISHED, boolean shouldListen = conf.getBoolean(HConstants.STATUS_PUBLISHED,
HConstants.STATUS_PUBLISHED_DEFAULT); HConstants.STATUS_PUBLISHED_DEFAULT);
@ -377,6 +386,11 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
return new HBaseAdmin(this); return new HBaseAdmin(this);
} }
@Override
public MetricsConnection getConnectionMetrics() {
return this.metrics;
}
private ExecutorService getBatchPool() { private ExecutorService getBatchPool() {
if (batchPool == null) { if (batchPool == null) {
synchronized (this) { synchronized (this) {
@ -2140,6 +2154,9 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
} }
closeMaster(); closeMaster();
shutdownPools(); shutdownPools();
if (this.metrics != null) {
this.metrics.shutdown();
}
this.closed = true; this.closed = true;
closeZooKeeperWatcher(); closeZooKeeperWatcher();
this.stubs.clear(); this.stubs.clear();

View File

@ -59,6 +59,12 @@ public class MetaCache {
// The access to this attribute must be protected by a lock on cachedRegionLocations // The access to this attribute must be protected by a lock on cachedRegionLocations
private final Set<ServerName> cachedServers = new ConcurrentSkipListSet<ServerName>(); private final Set<ServerName> cachedServers = new ConcurrentSkipListSet<ServerName>();
private final MetricsConnection metrics;
public MetaCache(MetricsConnection metrics) {
this.metrics = metrics;
}
/** /**
* Search the cache for a location that fits our table and row key. * Search the cache for a location that fits our table and row key.
* Return null if no suitable region is located. * Return null if no suitable region is located.
@ -74,6 +80,7 @@ public class MetaCache {
Entry<byte[], RegionLocations> e = tableLocations.floorEntry(row); Entry<byte[], RegionLocations> e = tableLocations.floorEntry(row);
if (e == null) { if (e == null) {
if (metrics != null) metrics.incrMetaCacheMiss();
return null; return null;
} }
RegionLocations possibleRegion = e.getValue(); RegionLocations possibleRegion = e.getValue();
@ -94,10 +101,12 @@ public class MetaCache {
// HConstants.EMPTY_END_ROW) check itself will pass. // HConstants.EMPTY_END_ROW) check itself will pass.
if (Bytes.equals(endKey, HConstants.EMPTY_END_ROW) || if (Bytes.equals(endKey, HConstants.EMPTY_END_ROW) ||
Bytes.compareTo(endKey, 0, endKey.length, row, 0, row.length) > 0) { Bytes.compareTo(endKey, 0, endKey.length, row, 0, row.length) > 0) {
if (metrics != null) metrics.incrMetaCacheHit();
return possibleRegion; return possibleRegion;
} }
// Passed all the way through, so we got nothing - complete cache miss // Passed all the way through, so we got nothing - complete cache miss
if (metrics != null) metrics.incrMetaCacheMiss();
return null; return null;
} }

View File

@ -0,0 +1,324 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;
import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.Descriptors.MethodDescriptor;
import com.google.protobuf.Message;
import com.yammer.metrics.core.Counter;
import com.yammer.metrics.core.Histogram;
import com.yammer.metrics.core.MetricsRegistry;
import com.yammer.metrics.core.Timer;
import com.yammer.metrics.reporting.JmxReporter;
import com.yammer.metrics.util.RatioGauge;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateRequest;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* This class is for maintaining the various connection statistics and publishing them through
* the metrics interfaces.
*
* This class manages its own {@link MetricsRegistry} and {@link JmxReporter} so as to not
* conflict with other uses of Yammer Metrics within the client application. Instantiating
* this class implicitly creates and "starts" instances of these classes; be sure to call
* {@link #shutdown()} to terminate the thread pools they allocate.
*/
@InterfaceAudience.Private
public class MetricsConnection {
/** Set this key to {@code true} to enable metrics collection of client requests. */
public static final String CLIENT_SIDE_METRICS_ENABLED_KEY = "hbase.client.metrics.enable";
private static final String DRTN_BASE = "rpcCallDurationMs_";
private static final String REQ_BASE = "rpcCallRequestSizeBytes_";
private static final String RESP_BASE = "rpcCallResponseSizeBytes_";
private static final String CLIENT_SVC = ClientService.getDescriptor().getName();
/** A container class for collecting details about the RPC call as it percolates. */
public static class CallStats {
private long requestSizeBytes = 0;
private long responseSizeBytes = 0;
private long startTime = 0;
private long callTimeMs = 0;
public long getRequestSizeBytes() {
return requestSizeBytes;
}
public void setRequestSizeBytes(long requestSizeBytes) {
this.requestSizeBytes = requestSizeBytes;
}
public long getResponseSizeBytes() {
return responseSizeBytes;
}
public void setResponseSizeBytes(long responseSizeBytes) {
this.responseSizeBytes = responseSizeBytes;
}
public long getStartTime() {
return startTime;
}
public void setStartTime(long startTime) {
this.startTime = startTime;
}
public long getCallTimeMs() {
return callTimeMs;
}
public void setCallTimeMs(long callTimeMs) {
this.callTimeMs = callTimeMs;
}
}
@VisibleForTesting
protected final class CallTracker {
private final String name;
@VisibleForTesting final Timer callTimer;
@VisibleForTesting final Histogram reqHist;
@VisibleForTesting final Histogram respHist;
private CallTracker(MetricsRegistry registry, String name, String subName, String scope) {
StringBuilder sb = new StringBuilder(CLIENT_SVC).append("_").append(name);
if (subName != null) {
sb.append("(").append(subName).append(")");
}
this.name = sb.toString();
this.callTimer = registry.newTimer(MetricsConnection.class, DRTN_BASE + this.name, scope);
this.reqHist = registry.newHistogram(MetricsConnection.class, REQ_BASE + this.name, scope);
this.respHist = registry.newHistogram(MetricsConnection.class, RESP_BASE + this.name, scope);
}
private CallTracker(MetricsRegistry registry, String name, String scope) {
this(registry, name, null, scope);
}
public void updateRpc(CallStats stats) {
this.callTimer.update(stats.getCallTimeMs(), TimeUnit.MILLISECONDS);
this.reqHist.update(stats.getRequestSizeBytes());
this.respHist.update(stats.getResponseSizeBytes());
}
@Override
public String toString() {
return "CallTracker:" + name;
}
}
/** A lambda for dispatching to the appropriate metric factory method */
private static interface NewMetric<T> {
T newMetric(Class<?> clazz, String name, String scope);
}
/** Anticipated number of metric entries */
private static final int CAPACITY = 50;
/** Default load factor from {@link java.util.HashMap#DEFAULT_LOAD_FACTOR} */
private static final float LOAD_FACTOR = 0.75f;
/**
* Anticipated number of concurrent accessor threads, from
* {@link ConnectionImplementation#getBatchPool()}
*/
private static final int CONCURRENCY_LEVEL = 256;
private final MetricsRegistry registry;
private final JmxReporter reporter;
private final String scope;
private final NewMetric<Timer> timerFactory = new NewMetric<Timer>() {
@Override public Timer newMetric(Class<?> clazz, String name, String scope) {
return registry.newTimer(clazz, name, scope);
}
};
private final NewMetric<Histogram> histogramFactory = new NewMetric<Histogram>() {
@Override public Histogram newMetric(Class<?> clazz, String name, String scope) {
return registry.newHistogram(clazz, name, scope);
}
};
// static metrics
@VisibleForTesting protected final Counter metaCacheHits;
@VisibleForTesting protected final Counter metaCacheMisses;
@VisibleForTesting protected final CallTracker getTracker;
@VisibleForTesting protected final CallTracker scanTracker;
@VisibleForTesting protected final CallTracker appendTracker;
@VisibleForTesting protected final CallTracker deleteTracker;
@VisibleForTesting protected final CallTracker incrementTracker;
@VisibleForTesting protected final CallTracker putTracker;
@VisibleForTesting protected final CallTracker multiTracker;
// dynamic metrics
// These maps are used to cache references to the metric instances that are managed by the
// registry. I don't think their use perfectly removes redundant allocations, but it's
// a big improvement over calling registry.newMetric each time.
@VisibleForTesting protected final ConcurrentMap<String, Timer> rpcTimers =
new ConcurrentHashMap<>(CAPACITY, LOAD_FACTOR, CONCURRENCY_LEVEL);
@VisibleForTesting protected final ConcurrentMap<String, Histogram> rpcHistograms =
new ConcurrentHashMap<>(CAPACITY * 2 /* tracking both request and response sizes */,
LOAD_FACTOR, CONCURRENCY_LEVEL);
public MetricsConnection(final ConnectionImplementation conn) {
this.scope = conn.toString();
this.registry = new MetricsRegistry();
final ThreadPoolExecutor batchPool = (ThreadPoolExecutor) conn.getCurrentBatchPool();
final ThreadPoolExecutor metaPool = (ThreadPoolExecutor) conn.getCurrentMetaLookupPool();
this.registry.newGauge(this.getClass(), "executorPoolActiveThreads", scope,
new RatioGauge() {
@Override protected double getNumerator() {
return batchPool.getActiveCount();
}
@Override protected double getDenominator() {
return batchPool.getMaximumPoolSize();
}
});
this.registry.newGauge(this.getClass(), "metaPoolActiveThreads", scope,
new RatioGauge() {
@Override protected double getNumerator() {
return metaPool.getActiveCount();
}
@Override protected double getDenominator() {
return metaPool.getMaximumPoolSize();
}
});
this.metaCacheHits = registry.newCounter(this.getClass(), "metaCacheHits", scope);
this.metaCacheMisses = registry.newCounter(this.getClass(), "metaCacheMisses", scope);
this.getTracker = new CallTracker(this.registry, "Get", scope);
this.scanTracker = new CallTracker(this.registry, "Scan", scope);
this.appendTracker = new CallTracker(this.registry, "Mutate", "Append", scope);
this.deleteTracker = new CallTracker(this.registry, "Mutate", "Delete", scope);
this.incrementTracker = new CallTracker(this.registry, "Mutate", "Increment", scope);
this.putTracker = new CallTracker(this.registry, "Mutate", "Put", scope);
this.multiTracker = new CallTracker(this.registry, "Multi", scope);
this.reporter = new JmxReporter(this.registry);
this.reporter.start();
}
public void shutdown() {
this.reporter.shutdown();
this.registry.shutdown();
}
/** Produce an instance of {@link CallStats} for clients to attach to RPCs. */
public static CallStats newCallStats() {
// TODO: instance pool to reduce GC?
return new CallStats();
}
/** Increment the number of meta cache hits. */
public void incrMetaCacheHit() {
metaCacheHits.inc();
}
/** Increment the number of meta cache misses. */
public void incrMetaCacheMiss() {
metaCacheMisses.inc();
}
/**
* Get a metric for {@code key} from {@code map}, or create it with {@code factory}.
*/
private <T> T getMetric(String key, ConcurrentMap<String, T> map, NewMetric<T> factory) {
T t = map.get(key);
if (t == null) {
t = factory.newMetric(this.getClass(), key, scope);
map.putIfAbsent(key, t);
}
return t;
}
/** Update call stats for non-critical-path methods */
private void updateRpcGeneric(MethodDescriptor method, CallStats stats) {
final String methodName = method.getService().getName() + "_" + method.getName();
getMetric(DRTN_BASE + methodName, rpcTimers, timerFactory)
.update(stats.getCallTimeMs(), TimeUnit.MILLISECONDS);
getMetric(REQ_BASE + methodName, rpcHistograms, histogramFactory)
.update(stats.getRequestSizeBytes());
getMetric(RESP_BASE + methodName, rpcHistograms, histogramFactory)
.update(stats.getResponseSizeBytes());
}
/** Report RPC context to metrics system. */
public void updateRpc(MethodDescriptor method, Message param, CallStats stats) {
// this implementation is tied directly to protobuf implementation details. would be better
// if we could dispatch based on something static, ie, request Message type.
if (method.getService() == ClientService.getDescriptor()) {
switch(method.getIndex()) {
case 0:
assert "Get".equals(method.getName());
getTracker.updateRpc(stats);
return;
case 1:
assert "Mutate".equals(method.getName());
final MutationType mutationType = ((MutateRequest) param).getMutation().getMutateType();
switch(mutationType) {
case APPEND:
appendTracker.updateRpc(stats);
return;
case DELETE:
deleteTracker.updateRpc(stats);
return;
case INCREMENT:
incrementTracker.updateRpc(stats);
return;
case PUT:
putTracker.updateRpc(stats);
return;
default:
throw new RuntimeException("Unrecognized mutation type " + mutationType);
}
case 2:
assert "Scan".equals(method.getName());
scanTracker.updateRpc(stats);
return;
case 3:
assert "BulkLoadHFile".equals(method.getName());
// use generic implementation
break;
case 4:
assert "ExecService".equals(method.getName());
// use generic implementation
break;
case 5:
assert "ExecRegionServerService".equals(method.getName());
// use generic implementation
break;
case 6:
assert "Multi".equals(method.getName());
multiTracker.updateRpc(stats);
return;
default:
throw new RuntimeException("Unrecognized ClientService RPC type " + method.getFullName());
}
}
// Fallback to dynamic registry lookup for DDL methods.
updateRpcGeneric(method, stats);
}
}

View File

@ -31,6 +31,7 @@ import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.MetricsConnection;
import org.apache.hadoop.hbase.codec.Codec; import org.apache.hadoop.hbase.codec.Codec;
import org.apache.hadoop.hbase.codec.KeyValueCodec; import org.apache.hadoop.hbase.codec.KeyValueCodec;
import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.security.User;
@ -55,6 +56,7 @@ public abstract class AbstractRpcClient implements RpcClient {
protected final Configuration conf; protected final Configuration conf;
protected String clusterId; protected String clusterId;
protected final SocketAddress localAddr; protected final SocketAddress localAddr;
protected final MetricsConnection metrics;
protected UserProvider userProvider; protected UserProvider userProvider;
protected final IPCUtil ipcUtil; protected final IPCUtil ipcUtil;
@ -79,8 +81,10 @@ public abstract class AbstractRpcClient implements RpcClient {
* @param conf configuration * @param conf configuration
* @param clusterId the cluster id * @param clusterId the cluster id
* @param localAddr client socket bind address. * @param localAddr client socket bind address.
* @param metrics the connection metrics
*/ */
public AbstractRpcClient(Configuration conf, String clusterId, SocketAddress localAddr) { public AbstractRpcClient(Configuration conf, String clusterId, SocketAddress localAddr,
MetricsConnection metrics) {
this.userProvider = UserProvider.instantiate(conf); this.userProvider = UserProvider.instantiate(conf);
this.localAddr = localAddr; this.localAddr = localAddr;
this.tcpKeepAlive = conf.getBoolean("hbase.ipc.client.tcpkeepalive", true); this.tcpKeepAlive = conf.getBoolean("hbase.ipc.client.tcpkeepalive", true);
@ -100,6 +104,7 @@ public abstract class AbstractRpcClient implements RpcClient {
this.connectTO = conf.getInt(SOCKET_TIMEOUT_CONNECT, DEFAULT_SOCKET_TIMEOUT_CONNECT); this.connectTO = conf.getInt(SOCKET_TIMEOUT_CONNECT, DEFAULT_SOCKET_TIMEOUT_CONNECT);
this.readTO = conf.getInt(SOCKET_TIMEOUT_READ, DEFAULT_SOCKET_TIMEOUT_READ); this.readTO = conf.getInt(SOCKET_TIMEOUT_READ, DEFAULT_SOCKET_TIMEOUT_READ);
this.writeTO = conf.getInt(SOCKET_TIMEOUT_WRITE, DEFAULT_SOCKET_TIMEOUT_WRITE); this.writeTO = conf.getInt(SOCKET_TIMEOUT_WRITE, DEFAULT_SOCKET_TIMEOUT_WRITE);
this.metrics = metrics;
// login the server principal (if using secure Hadoop) // login the server principal (if using secure Hadoop)
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
@ -205,19 +210,20 @@ public abstract class AbstractRpcClient implements RpcClient {
pcrc = new PayloadCarryingRpcController(); pcrc = new PayloadCarryingRpcController();
} }
long startTime = 0;
if (LOG.isTraceEnabled()) {
startTime = EnvironmentEdgeManager.currentTime();
}
Pair<Message, CellScanner> val; Pair<Message, CellScanner> val;
try { try {
val = call(pcrc, md, param, returnType, ticket, isa); final MetricsConnection.CallStats cs = MetricsConnection.newCallStats();
cs.setStartTime(EnvironmentEdgeManager.currentTime());
val = call(pcrc, md, param, returnType, ticket, isa, cs);
// Shove the results into controller so can be carried across the proxy/pb service void. // Shove the results into controller so can be carried across the proxy/pb service void.
pcrc.setCellScanner(val.getSecond()); pcrc.setCellScanner(val.getSecond());
cs.setCallTimeMs(EnvironmentEdgeManager.currentTime() - cs.getStartTime());
if (metrics != null) {
metrics.updateRpc(md, param, cs);
}
if (LOG.isTraceEnabled()) { if (LOG.isTraceEnabled()) {
long callTime = EnvironmentEdgeManager.currentTime() - startTime; LOG.trace("Call: " + md.getName() + ", callTime: " + cs.getCallTimeMs() + "ms");
LOG.trace("Call: " + md.getName() + ", callTime: " + callTime + "ms");
} }
return val.getFirst(); return val.getFirst();
} catch (Throwable e) { } catch (Throwable e) {
@ -242,7 +248,8 @@ public abstract class AbstractRpcClient implements RpcClient {
*/ */
protected abstract Pair<Message, CellScanner> call(PayloadCarryingRpcController pcrc, protected abstract Pair<Message, CellScanner> call(PayloadCarryingRpcController pcrc,
Descriptors.MethodDescriptor md, Message param, Message returnType, User ticket, Descriptors.MethodDescriptor md, Message param, Message returnType, User ticket,
InetSocketAddress isa) throws IOException, InterruptedException; InetSocketAddress isa, MetricsConnection.CallStats callStats)
throws IOException, InterruptedException;
@Override @Override
public BlockingRpcChannel createBlockingRpcChannel(final ServerName sn, final User ticket, public BlockingRpcChannel createBlockingRpcChannel(final ServerName sn, final User ticket,

View File

@ -25,6 +25,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.MetricsConnection;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.ExceptionUtil; import org.apache.hadoop.hbase.util.ExceptionUtil;
@ -49,6 +50,7 @@ public class AsyncCall extends DefaultPromise<Message> {
final Message responseDefaultType; final Message responseDefaultType;
final long startTime; final long startTime;
final long rpcTimeout; final long rpcTimeout;
final MetricsConnection.CallStats callStats;
/** /**
* Constructor * Constructor
@ -61,7 +63,8 @@ public class AsyncCall extends DefaultPromise<Message> {
* @param responseDefaultType the default response type * @param responseDefaultType the default response type
*/ */
public AsyncCall(EventLoop eventLoop, int connectId, Descriptors.MethodDescriptor md, Message public AsyncCall(EventLoop eventLoop, int connectId, Descriptors.MethodDescriptor md, Message
param, PayloadCarryingRpcController controller, Message responseDefaultType) { param, PayloadCarryingRpcController controller, Message responseDefaultType,
MetricsConnection.CallStats callStats) {
super(eventLoop); super(eventLoop);
this.id = connectId; this.id = connectId;
@ -73,6 +76,7 @@ public class AsyncCall extends DefaultPromise<Message> {
this.startTime = EnvironmentEdgeManager.currentTime(); this.startTime = EnvironmentEdgeManager.currentTime();
this.rpcTimeout = controller.hasCallTimeout() ? controller.getCallTimeout() : 0; this.rpcTimeout = controller.hasCallTimeout() ? controller.getCallTimeout() : 0;
this.callStats = callStats;
} }
/** /**

View File

@ -49,6 +49,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.MetricsConnection;
import org.apache.hadoop.hbase.exceptions.ConnectionClosingException; import org.apache.hadoop.hbase.exceptions.ConnectionClosingException;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.AuthenticationProtos; import org.apache.hadoop.hbase.protobuf.generated.AuthenticationProtos;
@ -310,10 +311,10 @@ public class AsyncRpcChannel {
*/ */
public Promise<Message> callMethod(final Descriptors.MethodDescriptor method, public Promise<Message> callMethod(final Descriptors.MethodDescriptor method,
final PayloadCarryingRpcController controller, final Message request, final PayloadCarryingRpcController controller, final Message request,
final Message responsePrototype) { final Message responsePrototype, MetricsConnection.CallStats callStats) {
final AsyncCall call = final AsyncCall call =
new AsyncCall(channel.eventLoop(), client.callIdCnt.getAndIncrement(), method, request, new AsyncCall(channel.eventLoop(), client.callIdCnt.getAndIncrement(), method, request,
controller, responsePrototype); controller, responsePrototype, callStats);
controller.notifyOnCancel(new RpcCallback<Object>() { controller.notifyOnCancel(new RpcCallback<Object>() {
@Override @Override
public void run(Object parameter) { public void run(Object parameter) {
@ -433,7 +434,7 @@ public class AsyncRpcChannel {
ByteBuf b = channel.alloc().directBuffer(4 + totalSize); ByteBuf b = channel.alloc().directBuffer(4 + totalSize);
try(ByteBufOutputStream out = new ByteBufOutputStream(b)) { try(ByteBufOutputStream out = new ByteBufOutputStream(b)) {
IPCUtil.write(out, rh, call.param, cellBlock); call.callStats.setRequestSizeBytes(IPCUtil.write(out, rh, call.param, cellBlock));
} }
channel.writeAndFlush(b).addListener(new CallWriteListener(this, call.id)); channel.writeAndFlush(b).addListener(new CallWriteListener(this, call.id));
@ -579,8 +580,6 @@ public class AsyncRpcChannel {
/** /**
* Clean up calls. * Clean up calls.
*
* @param cleanAll true if all calls should be cleaned, false for only the timed out calls
*/ */
private void cleanupCalls() { private void cleanupCalls() {
List<AsyncCall> toCleanup = new ArrayList<AsyncCall>(); List<AsyncCall> toCleanup = new ArrayList<AsyncCall>();

View File

@ -52,7 +52,9 @@ import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.MetricsConnection;
import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.JVM; import org.apache.hadoop.hbase.util.JVM;
import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.PoolMap; import org.apache.hadoop.hbase.util.PoolMap;
@ -146,12 +148,13 @@ public class AsyncRpcClient extends AbstractRpcClient {
* @param configuration to HBase * @param configuration to HBase
* @param clusterId for the cluster * @param clusterId for the cluster
* @param localAddress local address to connect to * @param localAddress local address to connect to
* @param metrics the connection metrics
* @param channelInitializer for custom channel handlers * @param channelInitializer for custom channel handlers
*/ */
@VisibleForTesting protected AsyncRpcClient(Configuration configuration, String clusterId,
AsyncRpcClient(Configuration configuration, String clusterId, SocketAddress localAddress, SocketAddress localAddress, MetricsConnection metrics,
ChannelInitializer<SocketChannel> channelInitializer) { ChannelInitializer<SocketChannel> channelInitializer) {
super(configuration, clusterId, localAddress); super(configuration, clusterId, localAddress, metrics);
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("Starting async Hbase RPC client"); LOG.debug("Starting async Hbase RPC client");
@ -191,15 +194,28 @@ public class AsyncRpcClient extends AbstractRpcClient {
} }
} }
/** Used in test only. */
AsyncRpcClient(Configuration configuration) {
this(configuration, HConstants.CLUSTER_ID_DEFAULT, null, null);
}
/** Used in test only. */
AsyncRpcClient(Configuration configuration,
ChannelInitializer<SocketChannel> channelInitializer) {
this(configuration, HConstants.CLUSTER_ID_DEFAULT, null, null, channelInitializer);
}
/** /**
* Constructor * Constructor
* *
* @param configuration to HBase * @param configuration to HBase
* @param clusterId for the cluster * @param clusterId for the cluster
* @param localAddress local address to connect to * @param localAddress local address to connect to
* @param metrics the connection metrics
*/ */
public AsyncRpcClient(Configuration configuration, String clusterId, SocketAddress localAddress) { public AsyncRpcClient(Configuration configuration, String clusterId, SocketAddress localAddress,
this(configuration, clusterId, localAddress, null); MetricsConnection metrics) {
this(configuration, clusterId, localAddress, metrics, null);
} }
/** /**
@ -219,13 +235,14 @@ public class AsyncRpcClient extends AbstractRpcClient {
@Override @Override
protected Pair<Message, CellScanner> call(PayloadCarryingRpcController pcrc, protected Pair<Message, CellScanner> call(PayloadCarryingRpcController pcrc,
Descriptors.MethodDescriptor md, Message param, Message returnType, User ticket, Descriptors.MethodDescriptor md, Message param, Message returnType, User ticket,
InetSocketAddress addr) throws IOException, InterruptedException { InetSocketAddress addr, MetricsConnection.CallStats callStats)
throws IOException, InterruptedException {
if (pcrc == null) { if (pcrc == null) {
pcrc = new PayloadCarryingRpcController(); pcrc = new PayloadCarryingRpcController();
} }
final AsyncRpcChannel connection = createRpcChannel(md.getService().getName(), addr, ticket); final AsyncRpcChannel connection = createRpcChannel(md.getService().getName(), addr, ticket);
Promise<Message> promise = connection.callMethod(md, pcrc, param, returnType); Promise<Message> promise = connection.callMethod(md, pcrc, param, returnType, callStats);
long timeout = pcrc.hasCallTimeout() ? pcrc.getCallTimeout() : 0; long timeout = pcrc.hasCallTimeout() ? pcrc.getCallTimeout() : 0;
try { try {
Message response = timeout > 0 ? promise.get(timeout, TimeUnit.MILLISECONDS) : promise.get(); Message response = timeout > 0 ? promise.get(timeout, TimeUnit.MILLISECONDS) : promise.get();
@ -244,17 +261,24 @@ public class AsyncRpcClient extends AbstractRpcClient {
/** /**
* Call method async * Call method async
*/ */
private void callMethod(Descriptors.MethodDescriptor md, final PayloadCarryingRpcController pcrc, private void callMethod(final Descriptors.MethodDescriptor md,
Message param, Message returnType, User ticket, InetSocketAddress addr, final PayloadCarryingRpcController pcrc, final Message param, Message returnType, User ticket,
final RpcCallback<Message> done) { InetSocketAddress addr, final RpcCallback<Message> done) {
final AsyncRpcChannel connection; final AsyncRpcChannel connection;
try { try {
connection = createRpcChannel(md.getService().getName(), addr, ticket); connection = createRpcChannel(md.getService().getName(), addr, ticket);
final MetricsConnection.CallStats cs = MetricsConnection.newCallStats();
connection.callMethod(md, pcrc, param, returnType).addListener( GenericFutureListener<Future<Message>> listener =
new GenericFutureListener<Future<Message>>() { new GenericFutureListener<Future<Message>>() {
@Override @Override
public void operationComplete(Future<Message> future) throws Exception { public void operationComplete(Future<Message> future) throws Exception {
cs.setCallTimeMs(EnvironmentEdgeManager.currentTime() - cs.getStartTime());
if (metrics != null) {
metrics.updateRpc(md, param, cs);
}
if (LOG.isTraceEnabled()) {
LOG.trace("Call: " + md.getName() + ", callTime: " + cs.getCallTimeMs() + "ms");
}
if (!future.isSuccess()) { if (!future.isSuccess()) {
Throwable cause = future.cause(); Throwable cause = future.cause();
if (cause instanceof IOException) { if (cause instanceof IOException) {
@ -277,7 +301,9 @@ public class AsyncRpcClient extends AbstractRpcClient {
} }
} }
} }
}); };
cs.setStartTime(EnvironmentEdgeManager.currentTime());
connection.callMethod(md, pcrc, param, returnType, cs).addListener(listener);
} catch (StoppedRpcClientException|FailedServerException e) { } catch (StoppedRpcClientException|FailedServerException e) {
pcrc.setFailed(e); pcrc.setFailed(e);
} }

View File

@ -24,8 +24,6 @@ import io.netty.channel.ChannelInboundHandlerAdapter;
import java.io.IOException; import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
@ -39,8 +37,6 @@ import com.google.protobuf.Message;
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
public class AsyncServerResponseHandler extends ChannelInboundHandlerAdapter { public class AsyncServerResponseHandler extends ChannelInboundHandlerAdapter {
private static final Log LOG = LogFactory.getLog(AsyncServerResponseHandler.class.getName());
private final AsyncRpcChannel channel; private final AsyncRpcChannel channel;
/** /**
@ -102,6 +98,7 @@ public class AsyncServerResponseHandler extends ChannelInboundHandlerAdapter {
cellBlockScanner = channel.client.createCellScanner(cellBlock); cellBlockScanner = channel.client.createCellScanner(cellBlock);
} }
call.setSuccess(value, cellBlockScanner); call.setSuccess(value, cellBlockScanner);
call.callStats.setResponseSizeBytes(totalSize);
} }
} catch (IOException e) { } catch (IOException e) {
// Treat this as a fatal condition and close this connection // Treat this as a fatal condition and close this connection

View File

@ -21,6 +21,7 @@ import com.google.protobuf.Descriptors;
import com.google.protobuf.Message; import com.google.protobuf.Message;
import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.MetricsConnection;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
@ -41,16 +42,18 @@ public class Call {
Message responseDefaultType; Message responseDefaultType;
IOException error; // exception, null if value IOException error; // exception, null if value
volatile boolean done; // true when call is done volatile boolean done; // true when call is done
long startTime;
final Descriptors.MethodDescriptor md; final Descriptors.MethodDescriptor md;
final int timeout; // timeout in millisecond for this call; 0 means infinite. final int timeout; // timeout in millisecond for this call; 0 means infinite.
final MetricsConnection.CallStats callStats;
protected Call(int id, final Descriptors.MethodDescriptor md, Message param, protected Call(int id, final Descriptors.MethodDescriptor md, Message param,
final CellScanner cells, final Message responseDefaultType, int timeout) { final CellScanner cells, final Message responseDefaultType, int timeout,
MetricsConnection.CallStats callStats) {
this.param = param; this.param = param;
this.md = md; this.md = md;
this.cells = cells; this.cells = cells;
this.startTime = EnvironmentEdgeManager.currentTime(); this.callStats = callStats;
this.callStats.setStartTime(EnvironmentEdgeManager.currentTime());
this.responseDefaultType = responseDefaultType; this.responseDefaultType = responseDefaultType;
this.id = id; this.id = id;
this.timeout = timeout; this.timeout = timeout;
@ -122,6 +125,6 @@ public class Call {
} }
public long getStartTime() { public long getStartTime() {
return this.startTime; return this.callStats.getStartTime();
} }
} }

View File

@ -17,8 +17,10 @@
*/ */
package org.apache.hadoop.hbase.ipc; package org.apache.hadoop.hbase.ipc;
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.MetricsConnection;
import org.apache.hadoop.hbase.util.ReflectionUtils; import org.apache.hadoop.hbase.util.ReflectionUtils;
import java.net.SocketAddress; import java.net.SocketAddress;
@ -37,13 +39,8 @@ public final class RpcClientFactory {
private RpcClientFactory() { private RpcClientFactory() {
} }
/** /** Helper method for tests only. Creates an {@code RpcClient} without metrics. */
* Creates a new RpcClient by the class defined in the configuration or falls back to @VisibleForTesting
* RpcClientImpl
* @param conf configuration
* @param clusterId the cluster id
* @return newly created RpcClient
*/
public static RpcClient createClient(Configuration conf, String clusterId) { public static RpcClient createClient(Configuration conf, String clusterId) {
return createClient(conf, clusterId, null); return createClient(conf, clusterId, null);
} }
@ -53,17 +50,32 @@ public final class RpcClientFactory {
* RpcClientImpl * RpcClientImpl
* @param conf configuration * @param conf configuration
* @param clusterId the cluster id * @param clusterId the cluster id
* @param localAddr client socket bind address. * @param metrics the connection metrics
* @return newly created RpcClient * @return newly created RpcClient
*/ */
public static RpcClient createClient(Configuration conf, String clusterId, public static RpcClient createClient(Configuration conf, String clusterId,
SocketAddress localAddr) { MetricsConnection metrics) {
return createClient(conf, clusterId, null, metrics);
}
/**
* Creates a new RpcClient by the class defined in the configuration or falls back to
* RpcClientImpl
* @param conf configuration
* @param clusterId the cluster id
* @param localAddr client socket bind address.
* @param metrics the connection metrics
* @return newly created RpcClient
*/
public static RpcClient createClient(Configuration conf, String clusterId,
SocketAddress localAddr, MetricsConnection metrics) {
String rpcClientClass = String rpcClientClass =
conf.get(CUSTOM_RPC_CLIENT_IMPL_CONF_KEY, AsyncRpcClient.class.getName()); conf.get(CUSTOM_RPC_CLIENT_IMPL_CONF_KEY, AsyncRpcClient.class.getName());
return ReflectionUtils.instantiateWithCustomCtor( return ReflectionUtils.instantiateWithCustomCtor(
rpcClientClass, rpcClientClass,
new Class[] { Configuration.class, String.class, SocketAddress.class }, new Class[] { Configuration.class, String.class, SocketAddress.class,
new Object[] { conf, clusterId, localAddr } MetricsConnection.class },
new Object[] { conf, clusterId, localAddr, metrics }
); );
} }
} }

View File

@ -19,6 +19,7 @@
package org.apache.hadoop.hbase.ipc; package org.apache.hadoop.hbase.ipc;
import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.Descriptors.MethodDescriptor; import com.google.protobuf.Descriptors.MethodDescriptor;
import com.google.protobuf.Message; import com.google.protobuf.Message;
import com.google.protobuf.Message.Builder; import com.google.protobuf.Message.Builder;
@ -32,6 +33,7 @@ import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.MetricsConnection;
import org.apache.hadoop.hbase.codec.Codec; import org.apache.hadoop.hbase.codec.Codec;
import org.apache.hadoop.hbase.exceptions.ConnectionClosingException; import org.apache.hadoop.hbase.exceptions.ConnectionClosingException;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
@ -911,7 +913,8 @@ public class RpcClientImpl extends AbstractRpcClient {
checkIsOpen(); // Now we're checking that it didn't became idle in between. checkIsOpen(); // Now we're checking that it didn't became idle in between.
try { try {
IPCUtil.write(this.out, header, call.param, cellBlock); call.callStats.setRequestSizeBytes(
IPCUtil.write(this.out, header, call.param, cellBlock));
} catch (IOException e) { } catch (IOException e) {
// We set the value inside the synchronized block, this way the next in line // We set the value inside the synchronized block, this way the next in line
// won't even try to write. Otherwise we might miss a call in the calls map? // won't even try to write. Otherwise we might miss a call in the calls map?
@ -964,12 +967,20 @@ public class RpcClientImpl extends AbstractRpcClient {
int readSoFar = IPCUtil.getTotalSizeWhenWrittenDelimited(responseHeader); int readSoFar = IPCUtil.getTotalSizeWhenWrittenDelimited(responseHeader);
int whatIsLeftToRead = totalSize - readSoFar; int whatIsLeftToRead = totalSize - readSoFar;
IOUtils.skipFully(in, whatIsLeftToRead); IOUtils.skipFully(in, whatIsLeftToRead);
if (call != null) {
call.callStats.setResponseSizeBytes(totalSize);
call.callStats.setCallTimeMs(
EnvironmentEdgeManager.currentTime() - call.callStats.getStartTime());
}
return; return;
} }
if (responseHeader.hasException()) { if (responseHeader.hasException()) {
ExceptionResponse exceptionResponse = responseHeader.getException(); ExceptionResponse exceptionResponse = responseHeader.getException();
RemoteException re = createRemoteException(exceptionResponse); RemoteException re = createRemoteException(exceptionResponse);
call.setException(re); call.setException(re);
call.callStats.setResponseSizeBytes(totalSize);
call.callStats.setCallTimeMs(
EnvironmentEdgeManager.currentTime() - call.callStats.getStartTime());
if (isFatalConnectionException(exceptionResponse)) { if (isFatalConnectionException(exceptionResponse)) {
markClosed(re); markClosed(re);
} }
@ -988,6 +999,9 @@ public class RpcClientImpl extends AbstractRpcClient {
cellBlockScanner = ipcUtil.createCellScanner(this.codec, this.compressor, cellBlock); cellBlockScanner = ipcUtil.createCellScanner(this.codec, this.compressor, cellBlock);
} }
call.setResponse(value, cellBlockScanner); call.setResponse(value, cellBlockScanner);
call.callStats.setResponseSizeBytes(totalSize);
call.callStats.setCallTimeMs(
EnvironmentEdgeManager.currentTime() - call.callStats.getStartTime());
} }
} catch (IOException e) { } catch (IOException e) {
if (expectedCall) call.setException(e); if (expectedCall) call.setException(e);
@ -1075,13 +1089,15 @@ public class RpcClientImpl extends AbstractRpcClient {
} }
/** /**
* Construct an IPC cluster client whose values are of the {@link Message} class. * Used in test only. Construct an IPC cluster client whose values are of the
* {@link Message} class.
* @param conf configuration * @param conf configuration
* @param clusterId the cluster id * @param clusterId the cluster id
* @param factory socket factory * @param factory socket factory
*/ */
@VisibleForTesting
RpcClientImpl(Configuration conf, String clusterId, SocketFactory factory) { RpcClientImpl(Configuration conf, String clusterId, SocketFactory factory) {
this(conf, clusterId, factory, null); this(conf, clusterId, factory, null, null);
} }
/** /**
@ -1090,10 +1106,11 @@ public class RpcClientImpl extends AbstractRpcClient {
* @param clusterId the cluster id * @param clusterId the cluster id
* @param factory socket factory * @param factory socket factory
* @param localAddr client socket bind address * @param localAddr client socket bind address
* @param metrics the connection metrics
*/ */
RpcClientImpl(Configuration conf, String clusterId, SocketFactory factory, RpcClientImpl(Configuration conf, String clusterId, SocketFactory factory,
SocketAddress localAddr) { SocketAddress localAddr, MetricsConnection metrics) {
super(conf, clusterId, localAddr); super(conf, clusterId, localAddr, metrics);
this.socketFactory = factory; this.socketFactory = factory;
this.connections = new PoolMap<ConnectionId, Connection>(getPoolType(conf), getPoolSize(conf)); this.connections = new PoolMap<ConnectionId, Connection>(getPoolType(conf), getPoolSize(conf));
@ -1101,25 +1118,27 @@ public class RpcClientImpl extends AbstractRpcClient {
} }
/** /**
* Construct an IPC client for the cluster <code>clusterId</code> with the default SocketFactory * Used in test only. Construct an IPC client for the cluster {@code clusterId} with
* @param conf configuration * the default SocketFactory
* @param clusterId the cluster id
*/ */
public RpcClientImpl(Configuration conf, String clusterId) { @VisibleForTesting
this(conf, clusterId, NetUtils.getDefaultSocketFactory(conf), null); RpcClientImpl(Configuration conf, String clusterId) {
this(conf, clusterId, NetUtils.getDefaultSocketFactory(conf), null, null);
} }
/** /**
* Construct an IPC client for the cluster <code>clusterId</code> with the default SocketFactory * Construct an IPC client for the cluster {@code clusterId} with the default SocketFactory
* *
* This method is called with reflection by the RpcClientFactory to create an instance * This method is called with reflection by the RpcClientFactory to create an instance
* *
* @param conf configuration * @param conf configuration
* @param clusterId the cluster id * @param clusterId the cluster id
* @param localAddr client socket bind address. * @param localAddr client socket bind address.
* @param metrics the connection metrics
*/ */
public RpcClientImpl(Configuration conf, String clusterId, SocketAddress localAddr) { public RpcClientImpl(Configuration conf, String clusterId, SocketAddress localAddr,
this(conf, clusterId, NetUtils.getDefaultSocketFactory(conf), localAddr); MetricsConnection metrics) {
this(conf, clusterId, NetUtils.getDefaultSocketFactory(conf), localAddr, metrics);
} }
/** Stop all threads related to this client. No further calls may be made /** Stop all threads related to this client. No further calls may be made
@ -1182,7 +1201,8 @@ public class RpcClientImpl extends AbstractRpcClient {
*/ */
@Override @Override
protected Pair<Message, CellScanner> call(PayloadCarryingRpcController pcrc, MethodDescriptor md, protected Pair<Message, CellScanner> call(PayloadCarryingRpcController pcrc, MethodDescriptor md,
Message param, Message returnType, User ticket, InetSocketAddress addr) Message param, Message returnType, User ticket, InetSocketAddress addr,
MetricsConnection.CallStats callStats)
throws IOException, InterruptedException { throws IOException, InterruptedException {
if (pcrc == null) { if (pcrc == null) {
pcrc = new PayloadCarryingRpcController(); pcrc = new PayloadCarryingRpcController();
@ -1190,7 +1210,7 @@ public class RpcClientImpl extends AbstractRpcClient {
CellScanner cells = pcrc.cellScanner(); CellScanner cells = pcrc.cellScanner();
final Call call = new Call(this.callIdCnt.getAndIncrement(), md, param, cells, returnType, final Call call = new Call(this.callIdCnt.getAndIncrement(), md, param, cells, returnType,
pcrc.getCallTimeout()); pcrc.getCallTimeout(), MetricsConnection.newCallStats());
final Connection connection = getConnection(ticket, call, addr); final Connection connection = getConnection(ticket, call, addr);

View File

@ -0,0 +1,120 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;
import com.google.protobuf.ByteString;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.GetRequest;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateRequest;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.MetricsTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.mockito.Mockito;
import java.io.IOException;
@Category({ClientTests.class, MetricsTests.class, SmallTests.class})
public class TestMetricsConnection {
private static MetricsConnection METRICS;
@BeforeClass
public static void beforeClass() {
ConnectionImplementation mocked = Mockito.mock(ConnectionImplementation.class);
Mockito.when(mocked.toString()).thenReturn("mocked-connection");
METRICS = new MetricsConnection(Mockito.mock(ConnectionImplementation.class));
}
@AfterClass
public static void afterClass() {
METRICS.shutdown();
}
@Test
public void testStaticMetrics() throws IOException {
final byte[] foo = Bytes.toBytes("foo");
final RegionSpecifier region = RegionSpecifier.newBuilder()
.setValue(ByteString.EMPTY)
.setType(RegionSpecifierType.REGION_NAME)
.build();
final int loop = 5;
for (int i = 0; i < loop; i++) {
METRICS.updateRpc(
ClientService.getDescriptor().findMethodByName("Get"),
GetRequest.getDefaultInstance(),
MetricsConnection.newCallStats());
METRICS.updateRpc(
ClientService.getDescriptor().findMethodByName("Scan"),
ScanRequest.getDefaultInstance(),
MetricsConnection.newCallStats());
METRICS.updateRpc(
ClientService.getDescriptor().findMethodByName("Multi"),
MultiRequest.getDefaultInstance(),
MetricsConnection.newCallStats());
METRICS.updateRpc(
ClientService.getDescriptor().findMethodByName("Mutate"),
MutateRequest.newBuilder()
.setMutation(ProtobufUtil.toMutation(MutationType.APPEND, new Append(foo)))
.setRegion(region)
.build(),
MetricsConnection.newCallStats());
METRICS.updateRpc(
ClientService.getDescriptor().findMethodByName("Mutate"),
MutateRequest.newBuilder()
.setMutation(ProtobufUtil.toMutation(MutationType.DELETE, new Delete(foo)))
.setRegion(region)
.build(),
MetricsConnection.newCallStats());
METRICS.updateRpc(
ClientService.getDescriptor().findMethodByName("Mutate"),
MutateRequest.newBuilder()
.setMutation(ProtobufUtil.toMutation(MutationType.INCREMENT, new Increment(foo)))
.setRegion(region)
.build(),
MetricsConnection.newCallStats());
METRICS.updateRpc(
ClientService.getDescriptor().findMethodByName("Mutate"),
MutateRequest.newBuilder()
.setMutation(ProtobufUtil.toMutation(MutationType.PUT, new Put(foo)))
.setRegion(region)
.build(),
MetricsConnection.newCallStats());
}
for (MetricsConnection.CallTracker t : new MetricsConnection.CallTracker[] {
METRICS.getTracker, METRICS.scanTracker, METRICS.multiTracker, METRICS.appendTracker,
METRICS.deleteTracker, METRICS.incrementTracker, METRICS.putTracker
}) {
Assert.assertEquals("Failed to invoke callTimer on " + t, loop, t.callTimer.count());
Assert.assertEquals("Failed to invoke reqHist on " + t, loop, t.reqHist.count());
Assert.assertEquals("Failed to invoke respHist on " + t, loop, t.respHist.count());
}
}
}

View File

@ -851,7 +851,7 @@ public class HRegionServer extends HasThread implements
// Setup RPC client for master communication // Setup RPC client for master communication
rpcClient = RpcClientFactory.createClient(conf, clusterId, new InetSocketAddress( rpcClient = RpcClientFactory.createClient(conf, clusterId, new InetSocketAddress(
rpcServices.isa.getAddress(), 0)); rpcServices.isa.getAddress(), 0), clusterConnection.getConnectionMetrics());
boolean onlyMetaRefresh = false; boolean onlyMetaRefresh = false;
int storefileRefreshPeriod = conf.getInt( int storefileRefreshPeriod = conf.getInt(

View File

@ -17,6 +17,7 @@
*/ */
package org.apache.hadoop.hbase.regionserver; package org.apache.hadoop.hbase.regionserver;
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.hbase.CompatibilitySingletonFactory; import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
@ -48,7 +49,7 @@ public class MetricsRegionServer {
this.serverSource = serverSource; this.serverSource = serverSource;
} }
// for unit-test usage @VisibleForTesting
public MetricsRegionServerSource getMetricsSource() { public MetricsRegionServerSource getMetricsSource() {
return serverSource; return serverSource;
} }

View File

@ -135,8 +135,9 @@ public class TestClientTimeouts {
* Rpc Channel implementation with RandomTimeoutBlockingRpcChannel * Rpc Channel implementation with RandomTimeoutBlockingRpcChannel
*/ */
public static class RandomTimeoutRpcClient extends RpcClientImpl { public static class RandomTimeoutRpcClient extends RpcClientImpl {
public RandomTimeoutRpcClient(Configuration conf, String clusterId, SocketAddress localAddr) { public RandomTimeoutRpcClient(Configuration conf, String clusterId, SocketAddress localAddr,
super(conf, clusterId, localAddr); MetricsConnection metrics) {
super(conf, clusterId, localAddr, metrics);
} }
// Return my own instance, one that does random timeouts // Return my own instance, one that does random timeouts

View File

@ -38,6 +38,7 @@ import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.MetricsConnection;
import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoRequestProto; import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoRequestProto;
import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoResponseProto; import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoResponseProto;
import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyRequestProto; import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyRequestProto;
@ -163,7 +164,8 @@ public abstract class AbstractTestIPC {
final String message = "hello"; final String message = "hello";
EchoRequestProto param = EchoRequestProto.newBuilder().setMessage(message).build(); EchoRequestProto param = EchoRequestProto.newBuilder().setMessage(message).build();
Pair<Message, CellScanner> r = Pair<Message, CellScanner> r =
client.call(null, md, param, md.getOutputType().toProto(), User.getCurrent(), address); client.call(null, md, param, md.getOutputType().toProto(), User.getCurrent(), address,
new MetricsConnection.CallStats());
assertTrue(r.getSecond() == null); assertTrue(r.getSecond() == null);
// Silly assertion that the message is in the returned pb. // Silly assertion that the message is in the returned pb.
assertTrue(r.getFirst().toString().contains(message)); assertTrue(r.getFirst().toString().contains(message));
@ -205,7 +207,8 @@ public abstract class AbstractTestIPC {
PayloadCarryingRpcController pcrc = PayloadCarryingRpcController pcrc =
new PayloadCarryingRpcController(CellUtil.createCellScanner(cells)); new PayloadCarryingRpcController(CellUtil.createCellScanner(cells));
Pair<Message, CellScanner> r = Pair<Message, CellScanner> r =
client.call(pcrc, md, param, md.getOutputType().toProto(), User.getCurrent(), address); client.call(pcrc, md, param, md.getOutputType().toProto(), User.getCurrent(), address,
new MetricsConnection.CallStats());
int index = 0; int index = 0;
while (r.getSecond().advance()) { while (r.getSecond().advance()) {
assertTrue(CELL.equals(r.getSecond().current())); assertTrue(CELL.equals(r.getSecond().current()));
@ -231,7 +234,8 @@ public abstract class AbstractTestIPC {
InetSocketAddress address = rpcServer.getListenerAddress(); InetSocketAddress address = rpcServer.getListenerAddress();
MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo"); MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo");
EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build(); EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build();
client.call(null, md, param, null, User.getCurrent(), address); client.call(null, md, param, null, User.getCurrent(), address,
new MetricsConnection.CallStats());
fail("Expected an exception to have been thrown!"); fail("Expected an exception to have been thrown!");
} catch (Exception e) { } catch (Exception e) {
LOG.info("Caught expected exception: " + e.toString()); LOG.info("Caught expected exception: " + e.toString());
@ -255,10 +259,10 @@ public abstract class AbstractTestIPC {
MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo"); MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo");
EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build(); EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build();
for (int i = 0; i < 10; i++) { for (int i = 0; i < 10; i++) {
client.call( client.call(new PayloadCarryingRpcController(
new PayloadCarryingRpcController( CellUtil.createCellScanner(ImmutableList.<Cell> of(CELL))), md, param,
CellUtil.createCellScanner(ImmutableList.<Cell> of(CELL))), md, param, md md.getOutputType().toProto(), User.getCurrent(), rpcServer.getListenerAddress(),
.getOutputType().toProto(), User.getCurrent(), rpcServer.getListenerAddress()); new MetricsConnection.CallStats());
} }
verify(scheduler, times(10)).dispatch((CallRunner) anyObject()); verify(scheduler, times(10)).dispatch((CallRunner) anyObject());
} finally { } finally {

View File

@ -46,6 +46,7 @@ import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.Waiter; import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.client.MetricsConnection;
import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.RowMutations; import org.apache.hadoop.hbase.client.RowMutations;
import org.apache.hadoop.hbase.codec.Codec; import org.apache.hadoop.hbase.codec.Codec;
@ -116,7 +117,7 @@ public class TestAsyncIPC extends AbstractTestIPC {
@Override @Override
protected AsyncRpcClient createRpcClientNoCodec(Configuration conf) { protected AsyncRpcClient createRpcClientNoCodec(Configuration conf) {
setConf(conf); setConf(conf);
return new AsyncRpcClient(conf, HConstants.CLUSTER_ID_DEFAULT, null) { return new AsyncRpcClient(conf) {
@Override @Override
Codec getCodec() { Codec getCodec() {
@ -129,15 +130,13 @@ public class TestAsyncIPC extends AbstractTestIPC {
@Override @Override
protected AsyncRpcClient createRpcClient(Configuration conf) { protected AsyncRpcClient createRpcClient(Configuration conf) {
setConf(conf); setConf(conf);
return new AsyncRpcClient(conf, HConstants.CLUSTER_ID_DEFAULT, null); return new AsyncRpcClient(conf);
} }
@Override @Override
protected AsyncRpcClient createRpcClientRTEDuringConnectionSetup(Configuration conf) { protected AsyncRpcClient createRpcClientRTEDuringConnectionSetup(Configuration conf) {
setConf(conf); setConf(conf);
return new AsyncRpcClient(conf, HConstants.CLUSTER_ID_DEFAULT, null, return new AsyncRpcClient(conf, new ChannelInitializer<SocketChannel>() {
new ChannelInitializer<SocketChannel>() {
@Override @Override
protected void initChannel(SocketChannel ch) throws Exception { protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addFirst(new ChannelOutboundHandlerAdapter() { ch.pipeline().addFirst(new ChannelOutboundHandlerAdapter() {
@ -248,7 +247,7 @@ public class TestAsyncIPC extends AbstractTestIPC {
TestRpcServer rpcServer = new TestRpcServer(); TestRpcServer rpcServer = new TestRpcServer();
MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo"); MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo");
EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build(); EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build();
AsyncRpcClient client = new AsyncRpcClient(conf, HConstants.CLUSTER_ID_DEFAULT, null); AsyncRpcClient client = new AsyncRpcClient(conf);
KeyValue kv = BIG_CELL; KeyValue kv = BIG_CELL;
Put p = new Put(CellUtil.cloneRow(kv)); Put p = new Put(CellUtil.cloneRow(kv));
for (int i = 0; i < cellcount; i++) { for (int i = 0; i < cellcount; i++) {
@ -282,7 +281,8 @@ public class TestAsyncIPC extends AbstractTestIPC {
PayloadCarryingRpcController pcrc = PayloadCarryingRpcController pcrc =
new PayloadCarryingRpcController(CellUtil.createCellScanner(cells)); new PayloadCarryingRpcController(CellUtil.createCellScanner(cells));
// Pair<Message, CellScanner> response = // Pair<Message, CellScanner> response =
client.call(pcrc, md, builder.build(), param, user, address); client.call(pcrc, md, builder.build(), param, user, address,
new MetricsConnection.CallStats());
/* /*
* int count = 0; while (p.getSecond().advance()) { count++; } assertEquals(cells.size(), * int count = 0; while (p.getSecond().advance()) { count++; } assertEquals(cells.size(),
* count); * count);

View File

@ -37,15 +37,15 @@ public class TestGlobalEventLoopGroup {
public void test() { public void test() {
Configuration conf = HBaseConfiguration.create(); Configuration conf = HBaseConfiguration.create();
conf.setBoolean(AsyncRpcClient.USE_GLOBAL_EVENT_LOOP_GROUP, true); conf.setBoolean(AsyncRpcClient.USE_GLOBAL_EVENT_LOOP_GROUP, true);
AsyncRpcClient client = new AsyncRpcClient(conf, HConstants.CLUSTER_ID_DEFAULT, null); AsyncRpcClient client = new AsyncRpcClient(conf);
assertNotNull(AsyncRpcClient.GLOBAL_EVENT_LOOP_GROUP); assertNotNull(AsyncRpcClient.GLOBAL_EVENT_LOOP_GROUP);
AsyncRpcClient client1 = new AsyncRpcClient(conf, HConstants.CLUSTER_ID_DEFAULT, null); AsyncRpcClient client1 = new AsyncRpcClient(conf);
assertSame(client.bootstrap.group(), client1.bootstrap.group()); assertSame(client.bootstrap.group(), client1.bootstrap.group());
client1.close(); client1.close();
assertFalse(client.bootstrap.group().isShuttingDown()); assertFalse(client.bootstrap.group().isShuttingDown());
conf.setBoolean(AsyncRpcClient.USE_GLOBAL_EVENT_LOOP_GROUP, false); conf.setBoolean(AsyncRpcClient.USE_GLOBAL_EVENT_LOOP_GROUP, false);
AsyncRpcClient client2 = new AsyncRpcClient(conf, HConstants.CLUSTER_ID_DEFAULT, null); AsyncRpcClient client2 = new AsyncRpcClient(conf);
assertNotSame(client.bootstrap.group(), client2.bootstrap.group()); assertNotSame(client.bootstrap.group(), client2.bootstrap.group());
client2.close(); client2.close();

View File

@ -39,6 +39,7 @@ import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.MetricsConnection;
import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.RowMutations; import org.apache.hadoop.hbase.client.RowMutations;
import org.apache.hadoop.hbase.codec.Codec; import org.apache.hadoop.hbase.codec.Codec;
@ -145,7 +146,8 @@ public class TestIPC extends AbstractTestIPC {
PayloadCarryingRpcController pcrc = PayloadCarryingRpcController pcrc =
new PayloadCarryingRpcController(CellUtil.createCellScanner(cells)); new PayloadCarryingRpcController(CellUtil.createCellScanner(cells));
// Pair<Message, CellScanner> response = // Pair<Message, CellScanner> response =
client.call(pcrc, md, builder.build(), param, user, address); client.call(pcrc, md, builder.build(), param, user, address,
new MetricsConnection.CallStats());
/* /*
* int count = 0; while (p.getSecond().advance()) { count++; } assertEquals(cells.size(), * int count = 0; while (p.getSecond().advance()) { count++; } assertEquals(cells.size(),
* count); * count);

View File

@ -32,6 +32,7 @@ import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.MetricsConnection;
import org.apache.hadoop.hbase.client.RetriesExhaustedException; import org.apache.hadoop.hbase.client.RetriesExhaustedException;
import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.codec.Codec; import org.apache.hadoop.hbase.codec.Codec;
@ -58,8 +59,9 @@ public class TestRpcClientLeaks {
super(conf, clusterId); super(conf, clusterId);
} }
public MyRpcClientImpl(Configuration conf, String clusterId, SocketAddress address) { public MyRpcClientImpl(Configuration conf, String clusterId, SocketAddress address,
super(conf, clusterId, address); MetricsConnection metrics) {
super(conf, clusterId, address, metrics);
} }
@Override @Override

View File

@ -34,6 +34,7 @@ import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.MetricsConnection;
import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoRequestProto; import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoRequestProto;
import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoResponseProto; import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoResponseProto;
import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyRequestProto; import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyRequestProto;
@ -181,7 +182,7 @@ public class TestRpcHandlerException {
new PayloadCarryingRpcController(CellUtil.createCellScanner(ImmutableList.of(CELL))); new PayloadCarryingRpcController(CellUtil.createCellScanner(ImmutableList.of(CELL)));
client.call(controller, md, param, md.getOutputType().toProto(), User.getCurrent(), client.call(controller, md, param, md.getOutputType().toProto(), User.getCurrent(),
rpcServer.getListenerAddress()); rpcServer.getListenerAddress(), new MetricsConnection.CallStats());
} catch (Throwable e) { } catch (Throwable e) {
assert(abortable.isAborted() == true); assert(abortable.isAborted() == true);
} finally { } finally {