HBASE-5298 Add thrift metrics to thrift2
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1241974 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
10f108630f
commit
7ef8f6e307
|
@ -157,13 +157,6 @@ public class TBoundedThreadPoolServer extends TServer {
|
||||||
serverOptions = options;
|
serverOptions = options;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Return the server working queue
|
|
||||||
*/
|
|
||||||
public CallQueue getCallQueue() {
|
|
||||||
return this.callQueue;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void serve() {
|
public void serve() {
|
||||||
try {
|
try {
|
||||||
serverTransport_.listen();
|
serverTransport_.listen();
|
||||||
|
|
|
@ -61,7 +61,7 @@ public class ThriftMetrics implements Updater {
|
||||||
private MetricsTimeVaryingRate slowThriftCall =
|
private MetricsTimeVaryingRate slowThriftCall =
|
||||||
new MetricsTimeVaryingRate("slowThriftCall", registry);
|
new MetricsTimeVaryingRate("slowThriftCall", registry);
|
||||||
|
|
||||||
public ThriftMetrics(int port, Configuration conf) {
|
public ThriftMetrics(int port, Configuration conf, Class<?> iface) {
|
||||||
slowResponseTime = conf.getLong(
|
slowResponseTime = conf.getLong(
|
||||||
SLOW_RESPONSE_NANO_SEC, DEFAULT_SLOW_RESPONSE_NANO_SEC);
|
SLOW_RESPONSE_NANO_SEC, DEFAULT_SLOW_RESPONSE_NANO_SEC);
|
||||||
context = MetricsUtil.getContext(CONTEXT_NAME);
|
context = MetricsUtil.getContext(CONTEXT_NAME);
|
||||||
|
@ -73,7 +73,7 @@ public class ThriftMetrics implements Updater {
|
||||||
|
|
||||||
context.registerUpdater(this);
|
context.registerUpdater(this);
|
||||||
|
|
||||||
createMetricsForMethods(Hbase.Iface.class);
|
createMetricsForMethods(iface);
|
||||||
}
|
}
|
||||||
|
|
||||||
public void incTimeInQueue(long time) {
|
public void incTimeInQueue(long time) {
|
||||||
|
|
|
@ -123,7 +123,6 @@ public class ThriftServerRunner implements Runnable {
|
||||||
volatile TServer tserver;
|
volatile TServer tserver;
|
||||||
private final Hbase.Iface handler;
|
private final Hbase.Iface handler;
|
||||||
private final ThriftMetrics metrics;
|
private final ThriftMetrics metrics;
|
||||||
private CallQueue callQueue;
|
|
||||||
|
|
||||||
/** An enum of server implementation selections */
|
/** An enum of server implementation selections */
|
||||||
enum ImplType {
|
enum ImplType {
|
||||||
|
@ -227,7 +226,7 @@ public class ThriftServerRunner implements Runnable {
|
||||||
public ThriftServerRunner(Configuration conf, HBaseHandler handler) {
|
public ThriftServerRunner(Configuration conf, HBaseHandler handler) {
|
||||||
this.conf = HBaseConfiguration.create(conf);
|
this.conf = HBaseConfiguration.create(conf);
|
||||||
this.listenPort = conf.getInt(PORT_CONF_KEY, DEFAULT_LISTEN_PORT);
|
this.listenPort = conf.getInt(PORT_CONF_KEY, DEFAULT_LISTEN_PORT);
|
||||||
this.metrics = new ThriftMetrics(listenPort, conf);
|
this.metrics = new ThriftMetrics(listenPort, conf, Hbase.Iface.class);
|
||||||
this.handler = HbaseHandlerMetricsProxy.newInstance(handler, metrics, conf);
|
this.handler = HbaseHandlerMetricsProxy.newInstance(handler, metrics, conf);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -305,9 +304,10 @@ public class ThriftServerRunner implements Runnable {
|
||||||
tserver = new TNonblockingServer(serverArgs);
|
tserver = new TNonblockingServer(serverArgs);
|
||||||
} else if (implType == ImplType.HS_HA) {
|
} else if (implType == ImplType.HS_HA) {
|
||||||
THsHaServer.Args serverArgs = new THsHaServer.Args(serverTransport);
|
THsHaServer.Args serverArgs = new THsHaServer.Args(serverTransport);
|
||||||
this.callQueue = new CallQueue(new LinkedBlockingQueue<Call>(), metrics);
|
CallQueue callQueue =
|
||||||
|
new CallQueue(new LinkedBlockingQueue<Call>(), metrics);
|
||||||
ExecutorService executorService = createExecutor(
|
ExecutorService executorService = createExecutor(
|
||||||
this.callQueue, serverArgs.getWorkerThreads());
|
callQueue, serverArgs.getWorkerThreads());
|
||||||
serverArgs.executorService(executorService)
|
serverArgs.executorService(executorService)
|
||||||
.processor(processor)
|
.processor(processor)
|
||||||
.transportFactory(transportFactory)
|
.transportFactory(transportFactory)
|
||||||
|
@ -316,9 +316,10 @@ public class ThriftServerRunner implements Runnable {
|
||||||
} else { // THREADED_SELECTOR
|
} else { // THREADED_SELECTOR
|
||||||
TThreadedSelectorServer.Args serverArgs =
|
TThreadedSelectorServer.Args serverArgs =
|
||||||
new HThreadedSelectorServerArgs(serverTransport, conf);
|
new HThreadedSelectorServerArgs(serverTransport, conf);
|
||||||
this.callQueue = new CallQueue(new LinkedBlockingQueue<Call>(), metrics);
|
CallQueue callQueue =
|
||||||
|
new CallQueue(new LinkedBlockingQueue<Call>(), metrics);
|
||||||
ExecutorService executorService = createExecutor(
|
ExecutorService executorService = createExecutor(
|
||||||
this.callQueue, serverArgs.getWorkerThreads());
|
callQueue, serverArgs.getWorkerThreads());
|
||||||
serverArgs.executorService(executorService)
|
serverArgs.executorService(executorService)
|
||||||
.processor(processor)
|
.processor(processor)
|
||||||
.transportFactory(transportFactory)
|
.transportFactory(transportFactory)
|
||||||
|
@ -344,7 +345,6 @@ public class ThriftServerRunner implements Runnable {
|
||||||
+ "; " + serverArgs);
|
+ "; " + serverArgs);
|
||||||
TBoundedThreadPoolServer tserver =
|
TBoundedThreadPoolServer tserver =
|
||||||
new TBoundedThreadPoolServer(serverArgs, metrics);
|
new TBoundedThreadPoolServer(serverArgs, metrics);
|
||||||
this.callQueue = tserver.getCallQueue();
|
|
||||||
this.tserver = tserver;
|
this.tserver = tserver;
|
||||||
} else {
|
} else {
|
||||||
throw new AssertionError("Unsupported Thrift server implementation: " +
|
throw new AssertionError("Unsupported Thrift server implementation: " +
|
||||||
|
|
|
@ -19,7 +19,28 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hbase.thrift2;
|
package org.apache.hadoop.hbase.thrift2;
|
||||||
|
|
||||||
import static org.apache.hadoop.hbase.thrift2.ThriftUtilities.*;
|
import static org.apache.hadoop.hbase.thrift2.ThriftUtilities.deleteFromThrift;
|
||||||
|
import static org.apache.hadoop.hbase.thrift2.ThriftUtilities.deletesFromHBase;
|
||||||
|
import static org.apache.hadoop.hbase.thrift2.ThriftUtilities.deletesFromThrift;
|
||||||
|
import static org.apache.hadoop.hbase.thrift2.ThriftUtilities.getFromThrift;
|
||||||
|
import static org.apache.hadoop.hbase.thrift2.ThriftUtilities.getsFromThrift;
|
||||||
|
import static org.apache.hadoop.hbase.thrift2.ThriftUtilities.incrementFromThrift;
|
||||||
|
import static org.apache.hadoop.hbase.thrift2.ThriftUtilities.putFromThrift;
|
||||||
|
import static org.apache.hadoop.hbase.thrift2.ThriftUtilities.putsFromThrift;
|
||||||
|
import static org.apache.hadoop.hbase.thrift2.ThriftUtilities.resultFromHBase;
|
||||||
|
import static org.apache.hadoop.hbase.thrift2.ThriftUtilities.resultsFromHBase;
|
||||||
|
import static org.apache.hadoop.hbase.thrift2.ThriftUtilities.scanFromThrift;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.lang.reflect.InvocationHandler;
|
||||||
|
import java.lang.reflect.InvocationTargetException;
|
||||||
|
import java.lang.reflect.Method;
|
||||||
|
import java.lang.reflect.Proxy;
|
||||||
|
import java.nio.ByteBuffer;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
@ -28,6 +49,7 @@ import org.apache.hadoop.hbase.client.Delete;
|
||||||
import org.apache.hadoop.hbase.client.HTableInterface;
|
import org.apache.hadoop.hbase.client.HTableInterface;
|
||||||
import org.apache.hadoop.hbase.client.HTablePool;
|
import org.apache.hadoop.hbase.client.HTablePool;
|
||||||
import org.apache.hadoop.hbase.client.ResultScanner;
|
import org.apache.hadoop.hbase.client.ResultScanner;
|
||||||
|
import org.apache.hadoop.hbase.thrift.ThriftMetrics;
|
||||||
import org.apache.hadoop.hbase.thrift2.generated.TDelete;
|
import org.apache.hadoop.hbase.thrift2.generated.TDelete;
|
||||||
import org.apache.hadoop.hbase.thrift2.generated.TGet;
|
import org.apache.hadoop.hbase.thrift2.generated.TGet;
|
||||||
import org.apache.hadoop.hbase.thrift2.generated.THBaseService;
|
import org.apache.hadoop.hbase.thrift2.generated.THBaseService;
|
||||||
|
@ -39,13 +61,6 @@ import org.apache.hadoop.hbase.thrift2.generated.TResult;
|
||||||
import org.apache.hadoop.hbase.thrift2.generated.TScan;
|
import org.apache.hadoop.hbase.thrift2.generated.TScan;
|
||||||
import org.apache.thrift.TException;
|
import org.apache.thrift.TException;
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.nio.ByteBuffer;
|
|
||||||
import java.util.List;
|
|
||||||
import java.util.Map;
|
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This class is a glue object that connects Thrift RPC calls to the HBase client API primarily defined in the
|
* This class is a glue object that connects Thrift RPC calls to the HBase client API primarily defined in the
|
||||||
* HTableInterface.
|
* HTableInterface.
|
||||||
|
@ -61,7 +76,49 @@ public class ThriftHBaseServiceHandler implements THBaseService.Iface {
|
||||||
private final AtomicInteger nextScannerId = new AtomicInteger(0);
|
private final AtomicInteger nextScannerId = new AtomicInteger(0);
|
||||||
private final Map<Integer, ResultScanner> scannerMap = new ConcurrentHashMap<Integer, ResultScanner>();
|
private final Map<Integer, ResultScanner> scannerMap = new ConcurrentHashMap<Integer, ResultScanner>();
|
||||||
|
|
||||||
public ThriftHBaseServiceHandler(Configuration conf) {
|
public static THBaseService.Iface newInstance(
|
||||||
|
Configuration conf, ThriftMetrics metrics) {
|
||||||
|
THBaseService.Iface handler = new ThriftHBaseServiceHandler(conf);
|
||||||
|
return (THBaseService.Iface) Proxy.newProxyInstance(
|
||||||
|
handler.getClass().getClassLoader(),
|
||||||
|
handler.getClass().getInterfaces(),
|
||||||
|
new THBaseServiceMetricsProxy(handler, metrics));
|
||||||
|
}
|
||||||
|
|
||||||
|
private static class THBaseServiceMetricsProxy implements InvocationHandler {
|
||||||
|
private final THBaseService.Iface handler;
|
||||||
|
private final ThriftMetrics metrics;
|
||||||
|
|
||||||
|
private THBaseServiceMetricsProxy(
|
||||||
|
THBaseService.Iface handler, ThriftMetrics metrics) {
|
||||||
|
this.handler = handler;
|
||||||
|
this.metrics = metrics;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Object invoke(Object proxy, Method m, Object[] args)
|
||||||
|
throws Throwable {
|
||||||
|
Object result;
|
||||||
|
try {
|
||||||
|
long start = now();
|
||||||
|
result = m.invoke(handler, args);
|
||||||
|
int processTime = (int)(now() - start);
|
||||||
|
metrics.incMethodTime(m.getName(), processTime);
|
||||||
|
} catch (InvocationTargetException e) {
|
||||||
|
throw e.getTargetException();
|
||||||
|
} catch (Exception e) {
|
||||||
|
throw new RuntimeException(
|
||||||
|
"unexpected invocation exception: " + e.getMessage());
|
||||||
|
}
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static long now() {
|
||||||
|
return System.nanoTime();
|
||||||
|
}
|
||||||
|
|
||||||
|
ThriftHBaseServiceHandler(Configuration conf) {
|
||||||
htablePool = new HTablePool(conf, Integer.MAX_VALUE);
|
htablePool = new HTablePool(conf, Integer.MAX_VALUE);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -19,6 +19,15 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hbase.thrift2;
|
package org.apache.hadoop.hbase.thrift2;
|
||||||
|
|
||||||
|
import java.net.InetAddress;
|
||||||
|
import java.net.InetSocketAddress;
|
||||||
|
import java.net.UnknownHostException;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.concurrent.ExecutorService;
|
||||||
|
import java.util.concurrent.LinkedBlockingQueue;
|
||||||
|
import java.util.concurrent.ThreadPoolExecutor;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
import org.apache.commons.cli.CommandLine;
|
import org.apache.commons.cli.CommandLine;
|
||||||
import org.apache.commons.cli.CommandLineParser;
|
import org.apache.commons.cli.CommandLineParser;
|
||||||
import org.apache.commons.cli.HelpFormatter;
|
import org.apache.commons.cli.HelpFormatter;
|
||||||
|
@ -29,7 +38,11 @@ import org.apache.commons.cli.ParseException;
|
||||||
import org.apache.commons.cli.PosixParser;
|
import org.apache.commons.cli.PosixParser;
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||||
|
import org.apache.hadoop.hbase.thrift.CallQueue;
|
||||||
|
import org.apache.hadoop.hbase.thrift.CallQueue.Call;
|
||||||
|
import org.apache.hadoop.hbase.thrift.ThriftMetrics;
|
||||||
import org.apache.hadoop.hbase.thrift2.generated.THBaseService;
|
import org.apache.hadoop.hbase.thrift2.generated.THBaseService;
|
||||||
import org.apache.thrift.protocol.TBinaryProtocol;
|
import org.apache.thrift.protocol.TBinaryProtocol;
|
||||||
import org.apache.thrift.protocol.TCompactProtocol;
|
import org.apache.thrift.protocol.TCompactProtocol;
|
||||||
|
@ -46,19 +59,16 @@ import org.apache.thrift.transport.TServerTransport;
|
||||||
import org.apache.thrift.transport.TTransportException;
|
import org.apache.thrift.transport.TTransportException;
|
||||||
import org.apache.thrift.transport.TTransportFactory;
|
import org.apache.thrift.transport.TTransportFactory;
|
||||||
|
|
||||||
import java.net.InetAddress;
|
import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||||
import java.net.InetSocketAddress;
|
|
||||||
import java.net.UnknownHostException;
|
|
||||||
import java.util.List;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* ThriftServer - this class starts up a Thrift server which implements the HBase API specified in the
|
* ThriftServer - this class starts up a Thrift server which implements the HBase API specified in the
|
||||||
* HbaseClient.thrift IDL file.
|
* HbaseClient.thrift IDL file.
|
||||||
*/
|
*/
|
||||||
public class ThriftServer {
|
public class ThriftServer {
|
||||||
private static final Log log = LogFactory.getLog("ThriftServer");
|
private static final Log log = LogFactory.getLog(ThriftServer.class);
|
||||||
|
|
||||||
private static final String DEFAULT_LISTEN_PORT = "9090";
|
public static final String DEFAULT_LISTEN_PORT = "9090";
|
||||||
|
|
||||||
public ThriftServer() {
|
public ThriftServer() {
|
||||||
}
|
}
|
||||||
|
@ -141,17 +151,33 @@ public class ThriftServer {
|
||||||
return new TNonblockingServer(serverArgs);
|
return new TNonblockingServer(serverArgs);
|
||||||
}
|
}
|
||||||
|
|
||||||
private static TServer getTHsHaServer(TProtocolFactory protocolFactory, THBaseService.Processor processor,
|
private static TServer getTHsHaServer(TProtocolFactory protocolFactory,
|
||||||
TTransportFactory transportFactory, InetSocketAddress inetSocketAddress) throws TTransportException {
|
THBaseService.Processor processor, TTransportFactory transportFactory,
|
||||||
|
InetSocketAddress inetSocketAddress, ThriftMetrics metrics)
|
||||||
|
throws TTransportException {
|
||||||
TNonblockingServerTransport serverTransport = new TNonblockingServerSocket(inetSocketAddress);
|
TNonblockingServerTransport serverTransport = new TNonblockingServerSocket(inetSocketAddress);
|
||||||
log.info("starting HBase HsHA Thrift server on " + inetSocketAddress.toString());
|
log.info("starting HBase HsHA Thrift server on " + inetSocketAddress.toString());
|
||||||
THsHaServer.Args serverArgs = new THsHaServer.Args(serverTransport);
|
THsHaServer.Args serverArgs = new THsHaServer.Args(serverTransport);
|
||||||
|
ExecutorService executorService = createExecutor(
|
||||||
|
serverArgs.getWorkerThreads(), metrics);
|
||||||
|
serverArgs.executorService(executorService);
|
||||||
serverArgs.processor(processor);
|
serverArgs.processor(processor);
|
||||||
serverArgs.transportFactory(transportFactory);
|
serverArgs.transportFactory(transportFactory);
|
||||||
serverArgs.protocolFactory(protocolFactory);
|
serverArgs.protocolFactory(protocolFactory);
|
||||||
return new THsHaServer(serverArgs);
|
return new THsHaServer(serverArgs);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static ExecutorService createExecutor(
|
||||||
|
int workerThreads, ThriftMetrics metrics) {
|
||||||
|
CallQueue callQueue = new CallQueue(
|
||||||
|
new LinkedBlockingQueue<Call>(), metrics);
|
||||||
|
ThreadFactoryBuilder tfb = new ThreadFactoryBuilder();
|
||||||
|
tfb.setDaemon(true);
|
||||||
|
tfb.setNameFormat("thrift2-worker-%d");
|
||||||
|
return new ThreadPoolExecutor(workerThreads, workerThreads,
|
||||||
|
Long.MAX_VALUE, TimeUnit.SECONDS, callQueue, tfb.build());
|
||||||
|
}
|
||||||
|
|
||||||
private static TServer getTThreadPoolServer(TProtocolFactory protocolFactory, THBaseService.Processor processor,
|
private static TServer getTThreadPoolServer(TProtocolFactory protocolFactory, THBaseService.Processor processor,
|
||||||
TTransportFactory transportFactory, InetSocketAddress inetSocketAddress) throws TTransportException {
|
TTransportFactory transportFactory, InetSocketAddress inetSocketAddress) throws TTransportException {
|
||||||
TServerTransport serverTransport = new TServerSocket(inetSocketAddress);
|
TServerTransport serverTransport = new TServerSocket(inetSocketAddress);
|
||||||
|
@ -195,10 +221,14 @@ public class ThriftServer {
|
||||||
boolean nonblocking = cmd.hasOption("nonblocking");
|
boolean nonblocking = cmd.hasOption("nonblocking");
|
||||||
boolean hsha = cmd.hasOption("hsha");
|
boolean hsha = cmd.hasOption("hsha");
|
||||||
|
|
||||||
|
Configuration conf = HBaseConfiguration.create();
|
||||||
|
ThriftMetrics metrics = new ThriftMetrics(
|
||||||
|
listenPort, conf, THBaseService.Iface.class);
|
||||||
|
|
||||||
// Construct correct ProtocolFactory
|
// Construct correct ProtocolFactory
|
||||||
TProtocolFactory protocolFactory = getTProtocolFactory(cmd.hasOption("compact"));
|
TProtocolFactory protocolFactory = getTProtocolFactory(cmd.hasOption("compact"));
|
||||||
THBaseService.Iface handler = new ThriftHBaseServiceHandler(
|
THBaseService.Iface handler =
|
||||||
HBaseConfiguration.create());
|
ThriftHBaseServiceHandler.newInstance(conf, metrics);
|
||||||
THBaseService.Processor processor = new THBaseService.Processor(handler);
|
THBaseService.Processor processor = new THBaseService.Processor(handler);
|
||||||
|
|
||||||
boolean framed = cmd.hasOption("framed") || nonblocking || hsha;
|
boolean framed = cmd.hasOption("framed") || nonblocking || hsha;
|
||||||
|
@ -217,7 +247,7 @@ public class ThriftServer {
|
||||||
if (nonblocking) {
|
if (nonblocking) {
|
||||||
server = getTNonBlockingServer(protocolFactory, processor, transportFactory, inetSocketAddress);
|
server = getTNonBlockingServer(protocolFactory, processor, transportFactory, inetSocketAddress);
|
||||||
} else if (hsha) {
|
} else if (hsha) {
|
||||||
server = getTHsHaServer(protocolFactory, processor, transportFactory, inetSocketAddress);
|
server = getTHsHaServer(protocolFactory, processor, transportFactory, inetSocketAddress, metrics);
|
||||||
} else {
|
} else {
|
||||||
server = getTThreadPoolServer(protocolFactory, processor, transportFactory, inetSocketAddress);
|
server = getTThreadPoolServer(protocolFactory, processor, transportFactory, inetSocketAddress);
|
||||||
}
|
}
|
||||||
|
|
|
@ -31,6 +31,7 @@ import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||||
import org.apache.hadoop.hbase.SmallTests;
|
import org.apache.hadoop.hbase.SmallTests;
|
||||||
import org.apache.hadoop.hbase.thrift.CallQueue.Call;
|
import org.apache.hadoop.hbase.thrift.CallQueue.Call;
|
||||||
|
import org.apache.hadoop.hbase.thrift.generated.Hbase;
|
||||||
import org.apache.hadoop.metrics.ContextFactory;
|
import org.apache.hadoop.metrics.ContextFactory;
|
||||||
import org.apache.hadoop.metrics.MetricsContext;
|
import org.apache.hadoop.metrics.MetricsContext;
|
||||||
import org.apache.hadoop.metrics.MetricsUtil;
|
import org.apache.hadoop.metrics.MetricsUtil;
|
||||||
|
@ -106,7 +107,8 @@ public class TestCallQueue {
|
||||||
private static ThriftMetrics createMetrics() throws Exception {
|
private static ThriftMetrics createMetrics() throws Exception {
|
||||||
setupMetricsContext();
|
setupMetricsContext();
|
||||||
Configuration conf = UTIL.getConfiguration();
|
Configuration conf = UTIL.getConfiguration();
|
||||||
return new ThriftMetrics(ThriftServerRunner.DEFAULT_LISTEN_PORT, conf);
|
return new ThriftMetrics(
|
||||||
|
ThriftServerRunner.DEFAULT_LISTEN_PORT, conf, Hbase.Iface.class);
|
||||||
}
|
}
|
||||||
|
|
||||||
private static void setupMetricsContext() throws Exception {
|
private static void setupMetricsContext() throws Exception {
|
||||||
|
|
|
@ -141,7 +141,7 @@ public class TestThriftServer {
|
||||||
|
|
||||||
private static ThriftMetrics getMetrics(Configuration conf) throws Exception {
|
private static ThriftMetrics getMetrics(Configuration conf) throws Exception {
|
||||||
setupMetricsContext();
|
setupMetricsContext();
|
||||||
return new ThriftMetrics(ThriftServerRunner.DEFAULT_LISTEN_PORT, conf);
|
return new ThriftMetrics(ThriftServerRunner.DEFAULT_LISTEN_PORT, conf, Hbase.Iface.class);
|
||||||
}
|
}
|
||||||
|
|
||||||
private static void setupMetricsContext() throws IOException {
|
private static void setupMetricsContext() throws IOException {
|
||||||
|
|
|
@ -19,6 +19,13 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hbase.thrift2;
|
package org.apache.hadoop.hbase.thrift2;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertArrayEquals;
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertFalse;
|
||||||
|
import static org.junit.Assert.assertNull;
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
import static org.junit.Assert.fail;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.nio.ByteBuffer;
|
import java.nio.ByteBuffer;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
@ -26,15 +33,22 @@ import java.util.Collections;
|
||||||
import java.util.Comparator;
|
import java.util.Comparator;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.hbase.*;
|
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||||
|
import org.apache.hadoop.hbase.HColumnDescriptor;
|
||||||
|
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||||
|
import org.apache.hadoop.hbase.MediumTests;
|
||||||
import org.apache.hadoop.hbase.client.HBaseAdmin;
|
import org.apache.hadoop.hbase.client.HBaseAdmin;
|
||||||
|
import org.apache.hadoop.hbase.thrift.ThriftMetrics;
|
||||||
import org.apache.hadoop.hbase.thrift2.generated.TColumn;
|
import org.apache.hadoop.hbase.thrift2.generated.TColumn;
|
||||||
import org.apache.hadoop.hbase.thrift2.generated.TColumnIncrement;
|
import org.apache.hadoop.hbase.thrift2.generated.TColumnIncrement;
|
||||||
import org.apache.hadoop.hbase.thrift2.generated.TColumnValue;
|
import org.apache.hadoop.hbase.thrift2.generated.TColumnValue;
|
||||||
import org.apache.hadoop.hbase.thrift2.generated.TDelete;
|
import org.apache.hadoop.hbase.thrift2.generated.TDelete;
|
||||||
import org.apache.hadoop.hbase.thrift2.generated.TDeleteType;
|
import org.apache.hadoop.hbase.thrift2.generated.TDeleteType;
|
||||||
import org.apache.hadoop.hbase.thrift2.generated.TGet;
|
import org.apache.hadoop.hbase.thrift2.generated.TGet;
|
||||||
|
import org.apache.hadoop.hbase.thrift2.generated.THBaseService;
|
||||||
import org.apache.hadoop.hbase.thrift2.generated.TIOError;
|
import org.apache.hadoop.hbase.thrift2.generated.TIOError;
|
||||||
import org.apache.hadoop.hbase.thrift2.generated.TIllegalArgument;
|
import org.apache.hadoop.hbase.thrift2.generated.TIllegalArgument;
|
||||||
import org.apache.hadoop.hbase.thrift2.generated.TIncrement;
|
import org.apache.hadoop.hbase.thrift2.generated.TIncrement;
|
||||||
|
@ -42,6 +56,11 @@ import org.apache.hadoop.hbase.thrift2.generated.TPut;
|
||||||
import org.apache.hadoop.hbase.thrift2.generated.TResult;
|
import org.apache.hadoop.hbase.thrift2.generated.TResult;
|
||||||
import org.apache.hadoop.hbase.thrift2.generated.TScan;
|
import org.apache.hadoop.hbase.thrift2.generated.TScan;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
import org.apache.hadoop.metrics.ContextFactory;
|
||||||
|
import org.apache.hadoop.metrics.MetricsContext;
|
||||||
|
import org.apache.hadoop.metrics.MetricsUtil;
|
||||||
|
import org.apache.hadoop.metrics.spi.NoEmitMetricsContext;
|
||||||
|
import org.apache.hadoop.metrics.spi.OutputRecord;
|
||||||
import org.apache.thrift.TException;
|
import org.apache.thrift.TException;
|
||||||
import org.junit.AfterClass;
|
import org.junit.AfterClass;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
|
@ -49,13 +68,13 @@ import org.junit.BeforeClass;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.junit.experimental.categories.Category;
|
import org.junit.experimental.categories.Category;
|
||||||
|
|
||||||
import static org.junit.Assert.*;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Unit testing for ThriftServer.HBaseHandler, a part of the org.apache.hadoop.hbase.thrift2 package.
|
* Unit testing for ThriftServer.HBaseHandler, a part of the org.apache.hadoop.hbase.thrift2 package.
|
||||||
*/
|
*/
|
||||||
@Category(MediumTests.class)
|
@Category(MediumTests.class)
|
||||||
public class TestThriftHBaseServiceHandler {
|
public class TestThriftHBaseServiceHandler {
|
||||||
|
|
||||||
|
public static final Log LOG = LogFactory.getLog(TestThriftHBaseServiceHandler.class);
|
||||||
private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
|
private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
|
||||||
|
|
||||||
// Static names for tables, columns, rows, and values
|
// Static names for tables, columns, rows, and values
|
||||||
|
@ -513,6 +532,77 @@ public class TestThriftHBaseServiceHandler {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testMetrics() throws Exception {
|
||||||
|
Configuration conf = UTIL.getConfiguration();
|
||||||
|
ThriftMetrics metrics = getMetrics(conf);
|
||||||
|
THBaseService.Iface handler =
|
||||||
|
ThriftHBaseServiceHandler.newInstance(conf, metrics);
|
||||||
|
byte[] rowName = "testMetrics".getBytes();
|
||||||
|
ByteBuffer table = ByteBuffer.wrap(tableAname);
|
||||||
|
|
||||||
|
TGet get = new TGet(ByteBuffer.wrap(rowName));
|
||||||
|
assertFalse(handler.exists(table, get));
|
||||||
|
|
||||||
|
List<TColumnValue> columnValues = new ArrayList<TColumnValue>();
|
||||||
|
columnValues.add(new TColumnValue(ByteBuffer.wrap(familyAname),
|
||||||
|
ByteBuffer.wrap(qualifierAname),
|
||||||
|
ByteBuffer.wrap(valueAname)));
|
||||||
|
columnValues.add(new TColumnValue(ByteBuffer.wrap(familyBname),
|
||||||
|
ByteBuffer.wrap(qualifierBname),
|
||||||
|
ByteBuffer.wrap(valueBname)));
|
||||||
|
TPut put = new TPut(ByteBuffer.wrap(rowName), columnValues);
|
||||||
|
put.setColumnValues(columnValues);
|
||||||
|
|
||||||
|
handler.put(table, put);
|
||||||
|
|
||||||
|
assertTrue(handler.exists(table, get));
|
||||||
|
logMetrics(metrics);
|
||||||
|
verifyMetrics(metrics, "put_num_ops", 1);
|
||||||
|
verifyMetrics(metrics, "exists_num_ops", 2);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static ThriftMetrics getMetrics(Configuration conf) throws Exception {
|
||||||
|
setupMetricsContext();
|
||||||
|
return new ThriftMetrics(Integer.parseInt(ThriftServer.DEFAULT_LISTEN_PORT),
|
||||||
|
conf, THBaseService.Iface.class);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static void setupMetricsContext() throws IOException {
|
||||||
|
ContextFactory factory = ContextFactory.getFactory();
|
||||||
|
factory.setAttribute(ThriftMetrics.CONTEXT_NAME + ".class",
|
||||||
|
NoEmitMetricsContext.class.getName());
|
||||||
|
MetricsUtil.getContext(ThriftMetrics.CONTEXT_NAME)
|
||||||
|
.createRecord(ThriftMetrics.CONTEXT_NAME).remove();
|
||||||
|
}
|
||||||
|
|
||||||
|
private static void logMetrics(ThriftMetrics metrics) throws Exception {
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
MetricsContext context = MetricsUtil.getContext(
|
||||||
|
ThriftMetrics.CONTEXT_NAME);
|
||||||
|
metrics.doUpdates(context);
|
||||||
|
for (String key : context.getAllRecords().keySet()) {
|
||||||
|
for (OutputRecord record : context.getAllRecords().get(key)) {
|
||||||
|
for (String name : record.getMetricNames()) {
|
||||||
|
LOG.debug("metrics:" + name + " value:" +
|
||||||
|
record.getMetric(name).intValue());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static void verifyMetrics(ThriftMetrics metrics, String name, int expectValue)
|
||||||
|
throws Exception {
|
||||||
|
MetricsContext context = MetricsUtil.getContext(
|
||||||
|
ThriftMetrics.CONTEXT_NAME);
|
||||||
|
metrics.doUpdates(context);
|
||||||
|
OutputRecord record = context.getAllRecords().get(
|
||||||
|
ThriftMetrics.CONTEXT_NAME).iterator().next();
|
||||||
|
assertEquals(expectValue, record.getMetric(name).intValue());
|
||||||
|
}
|
||||||
|
|
||||||
@org.junit.Rule
|
@org.junit.Rule
|
||||||
public org.apache.hadoop.hbase.ResourceCheckerJUnitRule cu =
|
public org.apache.hadoop.hbase.ResourceCheckerJUnitRule cu =
|
||||||
new org.apache.hadoop.hbase.ResourceCheckerJUnitRule();
|
new org.apache.hadoop.hbase.ResourceCheckerJUnitRule();
|
||||||
|
|
Loading…
Reference in New Issue