From 4b4904a77cb1e4611105308119c56fa4a5a32121 Mon Sep 17 00:00:00 2001 From: Zhihong Yu Date: Thu, 2 Feb 2012 17:30:13 +0000 Subject: [PATCH] HBASE-5186 Add metrics to ThriftServer (Scott Chen) git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1239737 13f79535-47bb-0310-9956-ffa450edef68 --- .../regionserver/HRegionThriftServer.java | 1 - .../apache/hadoop/hbase/thrift/CallQueue.java | 260 ++++++++++++++++++ .../thrift/HbaseHandlerMetricsProxy.java | 80 ++++++ .../thrift/TBoundedThreadPoolServer.java | 24 +- .../hadoop/hbase/thrift/ThriftMetrics.java | 131 +++++++++ .../hbase/thrift/ThriftServerRunner.java | 56 +++- .../hadoop/hbase/thrift/TestCallQueue.java | 142 ++++++++++ .../hadoop/hbase/thrift/TestThriftServer.java | 70 ++++- 8 files changed, 734 insertions(+), 30 deletions(-) create mode 100644 src/main/java/org/apache/hadoop/hbase/thrift/CallQueue.java create mode 100644 src/main/java/org/apache/hadoop/hbase/thrift/HbaseHandlerMetricsProxy.java create mode 100644 src/main/java/org/apache/hadoop/hbase/thrift/ThriftMetrics.java create mode 100644 src/test/java/org/apache/hadoop/hbase/thrift/TestCallQueue.java diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionThriftServer.java b/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionThriftServer.java index 177cf2561a5..8b09d7eb969 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionThriftServer.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionThriftServer.java @@ -50,7 +50,6 @@ import org.apache.hadoop.hbase.thrift.generated.TRowResult; public class HRegionThriftServer extends Thread { public static final Log LOG = LogFactory.getLog(HRegionThriftServer.class); - public static final int DEFAULT_LISTEN_PORT = 9090; private final HRegionServer rs; private final ThriftServerRunner serverRunner; diff --git a/src/main/java/org/apache/hadoop/hbase/thrift/CallQueue.java b/src/main/java/org/apache/hadoop/hbase/thrift/CallQueue.java new file mode 100644 index 00000000000..5bd47a86ae4 --- /dev/null +++ b/src/main/java/org/apache/hadoop/hbase/thrift/CallQueue.java @@ -0,0 +1,260 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.thrift; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.TimeUnit; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +/** + * A BlockingQueue reports waiting time in queue and queue length to + * ThriftMetrics. + */ +public class CallQueue implements BlockingQueue { + private static Log LOG = LogFactory.getLog(CallQueue.class); + + private final BlockingQueue underlyingQueue; + private final ThriftMetrics metrics; + + public CallQueue(BlockingQueue underlyingQueue, + ThriftMetrics metrics) { + this.underlyingQueue = underlyingQueue; + this.metrics = metrics; + } + + private static long now() { + return System.nanoTime(); + } + + public static class Call implements Runnable { + final long startTime; + final Runnable underlyingRunnable; + + Call(Runnable underlyingRunnable) { + this.underlyingRunnable = underlyingRunnable; + this.startTime = now(); + } + + @Override + public void run() { + underlyingRunnable.run(); + } + + public long timeInQueue() { + return now() - startTime; + } + + @Override + public boolean equals(Object other) { + if (other instanceof Call) { + Call otherCall = (Call)(other); + return this.underlyingRunnable.equals(otherCall.underlyingRunnable); + } else if (other instanceof Runnable) { + return this.underlyingRunnable.equals(other); + } + return false; + } + + @Override + public int hashCode() { + return this.underlyingRunnable.hashCode(); + } + } + + @Override + public Runnable poll() { + Call result = underlyingQueue.poll(); + updateMetrics(result); + return result; + } + + private void updateMetrics(Call result) { + if (result == null) { + return; + } + metrics.incTimeInQueue(result.timeInQueue()); + metrics.setCallQueueLen(this.size()); + } + + @Override + public Runnable poll(long timeout, TimeUnit unit) throws InterruptedException { + Call result = underlyingQueue.poll(timeout, unit); + updateMetrics(result); + return result; + } + + @Override + public Runnable remove() { + Call result = underlyingQueue.remove(); + updateMetrics(result); + return result; + } + + @Override + public Runnable take() throws InterruptedException { + Call result = underlyingQueue.take(); + updateMetrics(result); + return result; + } + + @Override + public int drainTo(Collection destination) { + return drainTo(destination, Integer.MAX_VALUE); + } + + @Override + public int drainTo(Collection destination, + int maxElements) { + if (destination == this) { + throw new IllegalArgumentException( + "A BlockingQueue cannot drain to itself."); + } + List drained = new ArrayList(); + underlyingQueue.drainTo(drained, maxElements); + for (Call r : drained) { + updateMetrics(r); + } + destination.addAll(drained); + int sz = drained.size(); + LOG.info("Elements drained: " + sz); + return sz; + } + + + @Override + public boolean offer(Runnable element) { + return underlyingQueue.offer(new Call(element)); + } + + @Override + public boolean offer(Runnable element, long timeout, TimeUnit unit) + throws InterruptedException { + return underlyingQueue.offer(new Call(element), timeout, unit); + } + @Override + public void put(Runnable element) throws InterruptedException { + underlyingQueue.put(new Call(element)); + } + + @Override + public boolean add(Runnable element) { + return underlyingQueue.add(new Call(element)); + } + + @Override + public boolean addAll(Collection elements) { + int added = 0; + for (Runnable r : elements) { + added += underlyingQueue.add(new Call(r)) ? 1 : 0; + } + return added != 0; + } + + @Override + public Runnable element() { + return underlyingQueue.element(); + } + + @Override + public Runnable peek() { + return underlyingQueue.peek(); + } + + @Override + public void clear() { + underlyingQueue.clear(); + } + + @Override + public boolean containsAll(Collection elements) { + return underlyingQueue.containsAll(elements); + } + + @Override + public boolean isEmpty() { + return underlyingQueue.isEmpty(); + } + + @Override + public Iterator iterator() { + return new Iterator() { + final Iterator underlyingIterator = underlyingQueue.iterator(); + @Override + public Runnable next() { + return underlyingIterator.next(); + } + + @Override + public boolean hasNext() { + return underlyingIterator.hasNext(); + } + + @Override + public void remove() { + underlyingIterator.remove(); + } + }; + } + + @Override + public boolean removeAll(Collection elements) { + return underlyingQueue.removeAll(elements); + } + + @Override + public boolean retainAll(Collection elements) { + return underlyingQueue.retainAll(elements); + } + + @Override + public int size() { + return underlyingQueue.size(); + } + + @Override + public Object[] toArray() { + return underlyingQueue.toArray(); + } + + @Override + public T[] toArray(T[] array) { + return underlyingQueue.toArray(array); + } + + @Override + public boolean contains(Object element) { + return underlyingQueue.contains(element); + } + + @Override + public int remainingCapacity() { + return underlyingQueue.remainingCapacity(); + } + + @Override + public boolean remove(Object element) { + return underlyingQueue.remove(element); + } +} diff --git a/src/main/java/org/apache/hadoop/hbase/thrift/HbaseHandlerMetricsProxy.java b/src/main/java/org/apache/hadoop/hbase/thrift/HbaseHandlerMetricsProxy.java new file mode 100644 index 00000000000..3580b4fa4ff --- /dev/null +++ b/src/main/java/org/apache/hadoop/hbase/thrift/HbaseHandlerMetricsProxy.java @@ -0,0 +1,80 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.thrift; + +import java.lang.reflect.InvocationHandler; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.lang.reflect.Proxy; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.thrift.generated.Hbase; + + +/** + * Converts a Hbase.Iface using InvocationHandler so that it reports process + * time of each call to ThriftMetrics. + */ +public class HbaseHandlerMetricsProxy implements InvocationHandler { + + public static final Log LOG = LogFactory.getLog( + HbaseHandlerMetricsProxy.class); + + private final Hbase.Iface handler; + private final ThriftMetrics metrics; + + public static Hbase.Iface newInstance(Hbase.Iface handler, + ThriftMetrics metrics, + Configuration conf) { + return (Hbase.Iface) Proxy.newProxyInstance( + handler.getClass().getClassLoader(), + handler.getClass().getInterfaces(), + new HbaseHandlerMetricsProxy(handler, metrics, conf)); + } + + private HbaseHandlerMetricsProxy( + Hbase.Iface handler, ThriftMetrics metrics, Configuration conf) { + this.handler = handler; + this.metrics = metrics; + } + + @Override + public Object invoke(Object proxy, Method m, Object[] args) + throws Throwable { + Object result; + try { + long start = now(); + result = m.invoke(handler, args); + int processTime = (int)(now() - start); + metrics.incMethodTime(m.getName(), processTime); + } catch (InvocationTargetException e) { + throw e.getTargetException(); + } catch (Exception e) { + throw new RuntimeException( + "unexpected invocation exception: " + e.getMessage()); + } + return result; + } + + private static long now() { + return System.nanoTime(); + } +} diff --git a/src/main/java/org/apache/hadoop/hbase/thrift/TBoundedThreadPoolServer.java b/src/main/java/org/apache/hadoop/hbase/thrift/TBoundedThreadPoolServer.java index a88634e7416..550941d1da2 100644 --- a/src/main/java/org/apache/hadoop/hbase/thrift/TBoundedThreadPoolServer.java +++ b/src/main/java/org/apache/hadoop/hbase/thrift/TBoundedThreadPoolServer.java @@ -18,7 +18,6 @@ package org.apache.hadoop.hbase.thrift; -import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.RejectedExecutionException; @@ -29,6 +28,7 @@ import java.util.concurrent.TimeUnit; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.thrift.CallQueue.Call; import org.apache.hadoop.hbase.util.Threads; import org.apache.thrift.TException; import org.apache.thrift.TProcessor; @@ -103,6 +103,8 @@ public class TBoundedThreadPoolServer extends TServer { private static final Log LOG = LogFactory.getLog( TBoundedThreadPoolServer.class.getName()); + private final CallQueue callQueue; + public static class Args extends TThreadPoolServer.Args { int maxQueuedRequests; int threadKeepAliveTimeSec; @@ -135,15 +137,14 @@ public class TBoundedThreadPoolServer extends TServer { private Args serverOptions; - public TBoundedThreadPoolServer(Args options) { + public TBoundedThreadPoolServer(Args options, ThriftMetrics metrics) { super(options); - BlockingQueue executorQueue; if (options.maxQueuedRequests > 0) { - executorQueue = new LinkedBlockingQueue( - options.maxQueuedRequests); + this.callQueue = new CallQueue( + new LinkedBlockingQueue(options.maxQueuedRequests), metrics); } else { - executorQueue = new SynchronousQueue(); + this.callQueue = new CallQueue(new SynchronousQueue(), metrics); } ThreadFactoryBuilder tfb = new ThreadFactoryBuilder(); @@ -151,11 +152,18 @@ public class TBoundedThreadPoolServer extends TServer { tfb.setNameFormat("thrift-worker-%d"); executorService = new ThreadPoolExecutor(options.minWorkerThreads, - options.maxWorkerThreads, options.threadKeepAliveTimeSec, TimeUnit.SECONDS, - executorQueue, tfb.build()); + options.maxWorkerThreads, options.threadKeepAliveTimeSec, + TimeUnit.SECONDS, this.callQueue, tfb.build()); serverOptions = options; } + /** + * Return the server working queue + */ + public CallQueue getCallQueue() { + return this.callQueue; + } + public void serve() { try { serverTransport_.listen(); diff --git a/src/main/java/org/apache/hadoop/hbase/thrift/ThriftMetrics.java b/src/main/java/org/apache/hadoop/hbase/thrift/ThriftMetrics.java new file mode 100644 index 00000000000..a48ff3c1347 --- /dev/null +++ b/src/main/java/org/apache/hadoop/hbase/thrift/ThriftMetrics.java @@ -0,0 +1,131 @@ +/* + * Copyright The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hadoop.hbase.thrift; + +import java.lang.reflect.Method; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.thrift.generated.Hbase; +import org.apache.hadoop.metrics.MetricsContext; +import org.apache.hadoop.metrics.MetricsRecord; +import org.apache.hadoop.metrics.MetricsUtil; +import org.apache.hadoop.metrics.Updater; +import org.apache.hadoop.metrics.util.MetricsBase; +import org.apache.hadoop.metrics.util.MetricsIntValue; +import org.apache.hadoop.metrics.util.MetricsRegistry; +import org.apache.hadoop.metrics.util.MetricsTimeVaryingInt; +import org.apache.hadoop.metrics.util.MetricsTimeVaryingLong; +import org.apache.hadoop.metrics.util.MetricsTimeVaryingRate; + +/** + * This class is for maintaining the various statistics of thrift server + * and publishing them through the metrics interfaces. + */ +public class ThriftMetrics implements Updater { + public final static Log LOG = LogFactory.getLog(ThriftMetrics.class); + public final static String CONTEXT_NAME = "thriftserver"; + + private final MetricsContext context; + private final MetricsRecord metricsRecord; + private final MetricsRegistry registry = new MetricsRegistry(); + private final long slowResponseTime; + public static final String SLOW_RESPONSE_NANO_SEC = + "hbase.thrift.slow.response.nano.second"; + public static final long DEFAULT_SLOW_RESPONSE_NANO_SEC = 10 * 1000 * 1000; + + private final MetricsIntValue callQueueLen = + new MetricsIntValue("callQueueLen", registry); + private final MetricsTimeVaryingRate timeInQueue = + new MetricsTimeVaryingRate("timeInQueue", registry); + private MetricsTimeVaryingRate thriftCall = + new MetricsTimeVaryingRate("thriftCall", registry); + private MetricsTimeVaryingRate slowThriftCall = + new MetricsTimeVaryingRate("slowThriftCall", registry); + + public ThriftMetrics(int port, Configuration conf) { + slowResponseTime = conf.getLong( + SLOW_RESPONSE_NANO_SEC, DEFAULT_SLOW_RESPONSE_NANO_SEC); + context = MetricsUtil.getContext(CONTEXT_NAME); + metricsRecord = MetricsUtil.createRecord(context, CONTEXT_NAME); + + metricsRecord.setTag("port", port + ""); + + LOG.info("Initializing RPC Metrics with port=" + port); + + context.registerUpdater(this); + + createMetricsForMethods(Hbase.Iface.class); + } + + public void incTimeInQueue(long time) { + timeInQueue.inc(time); + } + + public void setCallQueueLen(int len) { + callQueueLen.set(len); + } + + public void incMethodTime(String name, int time) { + MetricsTimeVaryingRate methodTimeMetrc = getMethodTimeMetrics(name); + if (methodTimeMetrc == null) { + LOG.warn( + "Got incMethodTime() request for method that doesnt exist: " + name); + return; // ignore methods that dont exist. + } + + // inc method specific processTime + methodTimeMetrc.inc(time); + + // inc general processTime + thriftCall.inc(time); + if (time > slowResponseTime) { + slowThriftCall.inc(time); + } + } + + private void createMetricsForMethods(Class iface) { + for (Method m : iface.getDeclaredMethods()) { + if (getMethodTimeMetrics(m.getName()) == null) + LOG.debug("Creating metrics for method:" + m.getName()); + createMethodTimeMetrics(m.getName()); + } + } + + private MetricsTimeVaryingRate getMethodTimeMetrics(String key) { + return (MetricsTimeVaryingRate) registry.get(key); + } + + private MetricsTimeVaryingRate createMethodTimeMetrics(String key) { + return new MetricsTimeVaryingRate(key, this.registry); + } + + /** + * Push the metrics to the monitoring subsystem on doUpdate() call. + */ + public void doUpdates(final MetricsContext context) { + // getMetricsList() and pushMetric() are thread safe methods + for (MetricsBase m : registry.getMetricsList()) { + m.pushMetric(metricsRecord); + } + metricsRecord.update(); + } +} diff --git a/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServerRunner.java b/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServerRunner.java index b9f197ceca4..e5f7a48ec49 100644 --- a/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServerRunner.java +++ b/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServerRunner.java @@ -31,6 +31,11 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.TreeMap; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.Option; @@ -57,12 +62,13 @@ import org.apache.hadoop.hbase.filter.ParseFilter; import org.apache.hadoop.hbase.filter.PrefixFilter; import org.apache.hadoop.hbase.filter.WhileMatchFilter; import org.apache.hadoop.hbase.security.User; +import org.apache.hadoop.hbase.thrift.CallQueue.Call; import org.apache.hadoop.hbase.thrift.generated.AlreadyExists; import org.apache.hadoop.hbase.thrift.generated.BatchMutation; import org.apache.hadoop.hbase.thrift.generated.ColumnDescriptor; import org.apache.hadoop.hbase.thrift.generated.Hbase; -import org.apache.hadoop.hbase.thrift.generated.IOError; import org.apache.hadoop.hbase.thrift.generated.IllegalArgument; +import org.apache.hadoop.hbase.thrift.generated.IOError; import org.apache.hadoop.hbase.thrift.generated.Mutation; import org.apache.hadoop.hbase.thrift.generated.TCell; import org.apache.hadoop.hbase.thrift.generated.TRegionInfo; @@ -89,6 +95,7 @@ import org.apache.thrift.transport.TServerTransport; import org.apache.thrift.transport.TTransportFactory; import com.google.common.base.Joiner; +import com.google.common.util.concurrent.ThreadFactoryBuilder; /** * ThriftServerRunner - this class starts up a Thrift server which implements @@ -107,11 +114,14 @@ public class ThriftServerRunner implements Runnable { static final String PORT_CONF_KEY = "hbase.regionserver.thrift.port"; private static final String DEFAULT_BIND_ADDR = "0.0.0.0"; - private static final int DEFAULT_LISTEN_PORT = 9090; + public static final int DEFAULT_LISTEN_PORT = 9090; + private final int listenPort; private Configuration conf; volatile TServer tserver; - private final HBaseHandler handler; + private final Hbase.Iface handler; + private final ThriftMetrics metrics; + private CallQueue callQueue; /** An enum of server implementation selections */ enum ImplType { @@ -214,7 +224,10 @@ public class ThriftServerRunner implements Runnable { public ThriftServerRunner(Configuration conf, HBaseHandler handler) { this.conf = HBaseConfiguration.create(conf); - this.handler = handler; + this.listenPort = conf.getInt(PORT_CONF_KEY, DEFAULT_LISTEN_PORT); + this.metrics = new ThriftMetrics(listenPort, conf); + this.handler = HbaseHandlerMetricsProxy.newInstance(handler, metrics, conf); + } /* @@ -243,9 +256,6 @@ public class ThriftServerRunner implements Runnable { * Setting up the thrift TServer */ private void setupServer() throws Exception { - // Get port to bind to - int listenPort = conf.getInt(PORT_CONF_KEY, DEFAULT_LISTEN_PORT); - // Construct correct ProtocolFactory TProtocolFactory protocolFactory; if (conf.getBoolean(COMPACT_CONF_KEY, false)) { @@ -293,14 +303,22 @@ public class ThriftServerRunner implements Runnable { tserver = new TNonblockingServer(serverArgs); } else if (implType == ImplType.HS_HA) { THsHaServer.Args serverArgs = new THsHaServer.Args(serverTransport); - serverArgs.processor(processor) + this.callQueue = new CallQueue(new LinkedBlockingQueue(), metrics); + ExecutorService executorService = createExecutor( + this.callQueue, serverArgs.getWorkerThreads()); + serverArgs.executorService(executorService) + .processor(processor) .transportFactory(transportFactory) .protocolFactory(protocolFactory); tserver = new THsHaServer(serverArgs); } else { // THREADED_SELECTOR TThreadedSelectorServer.Args serverArgs = new HThreadedSelectorServerArgs(serverTransport, conf); - serverArgs.processor(processor) + this.callQueue = new CallQueue(new LinkedBlockingQueue(), metrics); + ExecutorService executorService = createExecutor( + this.callQueue, serverArgs.getWorkerThreads()); + serverArgs.executorService(executorService) + .processor(processor) .transportFactory(transportFactory) .protocolFactory(protocolFactory); tserver = new TThreadedSelectorServer(serverArgs); @@ -322,7 +340,10 @@ public class ThriftServerRunner implements Runnable { LOG.info("starting " + ImplType.THREAD_POOL.simpleClassName() + " on " + listenAddress + ":" + Integer.toString(listenPort) + "; " + serverArgs); - tserver = new TBoundedThreadPoolServer(serverArgs); + TBoundedThreadPoolServer tserver = + new TBoundedThreadPoolServer(serverArgs, metrics); + this.callQueue = tserver.getCallQueue(); + this.tserver = tserver; } else { throw new AssertionError("Unsupported Thrift server implementation: " + implType.simpleClassName()); @@ -336,7 +357,6 @@ public class ThriftServerRunner implements Runnable { } // login the server principal (if using secure Hadoop) - Configuration conf = handler.conf; if (User.isSecurityEnabled() && User.isHBaseSecurityEnabled(conf)) { String machineName = Strings.domainNamePointerToHostName( DNS.getDefaultHost(conf.get("hbase.thrift.dns.interface", "default"), @@ -346,12 +366,21 @@ public class ThriftServerRunner implements Runnable { } } + ExecutorService createExecutor(BlockingQueue callQueue, + int workerThreads) { + ThreadFactoryBuilder tfb = new ThreadFactoryBuilder(); + tfb.setDaemon(true); + tfb.setNameFormat("thrift-worker-%d"); + return new ThreadPoolExecutor(workerThreads, workerThreads, + Long.MAX_VALUE, TimeUnit.SECONDS, callQueue, tfb.build()); + } + private InetAddress getBindAddress(Configuration conf) throws UnknownHostException { String bindAddressStr = conf.get(BIND_CONF_KEY, DEFAULT_BIND_ADDR); return InetAddress.getByName(bindAddressStr); } - + /** * The HBaseHandler is a glue object that connects Thrift RPC calls to the * HBase client API primarily defined in the HBaseAdmin and HTable objects. @@ -456,8 +485,7 @@ public class ThriftServerRunner implements Runnable { this(HBaseConfiguration.create()); } - protected HBaseHandler(final Configuration c) - throws IOException { + protected HBaseHandler(final Configuration c) throws IOException { this.conf = c; admin = new HBaseAdmin(conf); scannerMap = new HashMap(); diff --git a/src/test/java/org/apache/hadoop/hbase/thrift/TestCallQueue.java b/src/test/java/org/apache/hadoop/hbase/thrift/TestCallQueue.java new file mode 100644 index 00000000000..ef105ab3444 --- /dev/null +++ b/src/test/java/org/apache/hadoop/hbase/thrift/TestCallQueue.java @@ -0,0 +1,142 @@ +/* + * Copyright The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hbase.thrift; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.concurrent.LinkedBlockingQueue; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.SmallTests; +import org.apache.hadoop.hbase.thrift.CallQueue.Call; +import org.apache.hadoop.metrics.ContextFactory; +import org.apache.hadoop.metrics.MetricsContext; +import org.apache.hadoop.metrics.MetricsUtil; +import org.apache.hadoop.metrics.spi.NoEmitMetricsContext; +import org.apache.hadoop.metrics.spi.OutputRecord; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; +import org.junit.Test; + +/** + * Unit testing for CallQueue, a part of the + * org.apache.hadoop.hbase.thrift package. + */ +@Category(SmallTests.class) +@RunWith(Parameterized.class) +public class TestCallQueue { + + public static final Log LOG = LogFactory.getLog(TestCallQueue.class); + private static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); + + private int elementsAdded; + private int elementsRemoved; + + @Parameters + public static Collection getParameters() { + Collection parameters = new ArrayList(); + for (int elementsAdded : new int[] {100, 200, 300}) { + for (int elementsRemoved : new int[] {0, 20, 100}) { + parameters.add(new Object[]{new Integer(elementsAdded), + new Integer(elementsRemoved)}); + } + } + return parameters; + } + + public TestCallQueue(int elementsAdded, int elementsRemoved) { + this.elementsAdded = elementsAdded; + this.elementsRemoved = elementsRemoved; + LOG.debug("elementsAdded:" + elementsAdded + + " elementsRemoved:" + elementsRemoved); + } + + @Test(timeout=3000) + public void testPutTake() throws Exception { + ThriftMetrics metrics = createMetrics(); + CallQueue callQueue = new CallQueue( + new LinkedBlockingQueue(), metrics); + for (int i = 0; i < elementsAdded; ++i) { + callQueue.put(createDummyRunnable()); + } + for (int i = 0; i < elementsRemoved; ++i) { + callQueue.take(); + } + verifyMetrics(metrics, "timeInQueue_num_ops", elementsRemoved); + } + + @Test(timeout=3000) + public void testOfferPoll() throws Exception { + ThriftMetrics metrics = createMetrics(); + CallQueue callQueue = new CallQueue( + new LinkedBlockingQueue(), metrics); + for (int i = 0; i < elementsAdded; ++i) { + callQueue.offer(createDummyRunnable()); + } + for (int i = 0; i < elementsRemoved; ++i) { + callQueue.poll(); + } + verifyMetrics(metrics, "timeInQueue_num_ops", elementsRemoved); + } + + private static ThriftMetrics createMetrics() throws Exception { + setupMetricsContext(); + Configuration conf = UTIL.getConfiguration(); + return new ThriftMetrics(ThriftServerRunner.DEFAULT_LISTEN_PORT, conf); + } + + private static void setupMetricsContext() throws Exception { + ContextFactory factory = ContextFactory.getFactory(); + factory.setAttribute(ThriftMetrics.CONTEXT_NAME + ".class", + NoEmitMetricsContext.class.getName()); + MetricsUtil.getContext(ThriftMetrics.CONTEXT_NAME) + .createRecord(ThriftMetrics.CONTEXT_NAME).remove(); + } + + private static void verifyMetrics(ThriftMetrics metrics, String name, int expectValue) + throws Exception { + MetricsContext context = MetricsUtil.getContext( + ThriftMetrics.CONTEXT_NAME); + metrics.doUpdates(context); + OutputRecord record = context.getAllRecords().get( + ThriftMetrics.CONTEXT_NAME).iterator().next(); + assertEquals(expectValue, record.getMetric(name).intValue()); + } + + private static Runnable createDummyRunnable() { + return new Runnable() { + @Override + public void run() { + } + }; + } + + @org.junit.Rule + public org.apache.hadoop.hbase.ResourceCheckerJUnitRule cu = + new org.apache.hadoop.hbase.ResourceCheckerJUnitRule(); +} + diff --git a/src/test/java/org/apache/hadoop/hbase/thrift/TestThriftServer.java b/src/test/java/org/apache/hadoop/hbase/thrift/TestThriftServer.java index a2a9dd48f47..a56f1afcca5 100644 --- a/src/test/java/org/apache/hadoop/hbase/thrift/TestThriftServer.java +++ b/src/test/java/org/apache/hadoop/hbase/thrift/TestThriftServer.java @@ -19,14 +19,20 @@ */ package org.apache.hadoop.hbase.thrift; -import static org.junit.Assert.*; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; +import java.util.Map; +import java.util.Collection; -import org.apache.hadoop.hbase.HBaseTestingUtility; -import org.apache.hadoop.hbase.MediumTests; +import org.junit.experimental.categories.Category; +import org.junit.Test; +import org.junit.BeforeClass; import org.apache.hadoop.hbase.thrift.generated.BatchMutation; import org.apache.hadoop.hbase.thrift.generated.ColumnDescriptor; import org.apache.hadoop.hbase.thrift.generated.Hbase; @@ -34,10 +40,15 @@ import org.apache.hadoop.hbase.thrift.generated.Mutation; import org.apache.hadoop.hbase.thrift.generated.TCell; import org.apache.hadoop.hbase.thrift.generated.TRowResult; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.metrics.ContextFactory; +import org.apache.hadoop.metrics.MetricsContext; +import org.apache.hadoop.metrics.MetricsUtil; +import org.apache.hadoop.metrics.spi.NoEmitMetricsContext; +import org.apache.hadoop.metrics.spi.OutputRecord; import org.junit.AfterClass; -import org.junit.BeforeClass; -import org.junit.Test; -import org.junit.experimental.categories.Category; +import org.apache.hadoop.hbase.MediumTests; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.conf.Configuration; /** * Unit testing for ThriftServerRunner.HBaseHandler, a part of the @@ -82,9 +93,11 @@ public class TestThriftServer { * * @throws Exception */ + @Test public void testAll() throws Exception { // Run all tests doTestTableCreateDrop(); + doTestThriftMetrics(); doTestTableMutations(); doTestTableTimestampsAndColumns(); doTestTableScanners(); @@ -98,7 +111,6 @@ public class TestThriftServer { * * @throws Exception */ - @Test public void doTestTableCreateDrop() throws Exception { ThriftServerRunner.HBaseHandler handler = new ThriftServerRunner.HBaseHandler(UTIL.getConfiguration()); @@ -106,6 +118,50 @@ public class TestThriftServer { dropTestTables(handler); } + /** + * Tests if the metrics for thrift handler work correctly + */ + public void doTestThriftMetrics() throws Exception { + Configuration conf = UTIL.getConfiguration(); + ThriftMetrics metrics = getMetrics(conf); + Hbase.Iface handler = getHandler(metrics, conf); + createTestTables(handler); + dropTestTables(handler); + verifyMetrics(metrics, "createTable_num_ops", 2); + verifyMetrics(metrics, "deleteTable_num_ops", 2); + verifyMetrics(metrics, "disableTable_num_ops", 2); + } + + private static Hbase.Iface getHandler(ThriftMetrics metrics, Configuration conf) + throws Exception { + Hbase.Iface handler = + new ThriftServerRunner.HBaseHandler(conf); + return HbaseHandlerMetricsProxy.newInstance(handler, metrics, conf); + } + + private static ThriftMetrics getMetrics(Configuration conf) throws Exception { + setupMetricsContext(); + return new ThriftMetrics(ThriftServerRunner.DEFAULT_LISTEN_PORT, conf); + } + + private static void setupMetricsContext() throws IOException { + ContextFactory factory = ContextFactory.getFactory(); + factory.setAttribute(ThriftMetrics.CONTEXT_NAME + ".class", + NoEmitMetricsContext.class.getName()); + MetricsUtil.getContext(ThriftMetrics.CONTEXT_NAME) + .createRecord(ThriftMetrics.CONTEXT_NAME).remove(); + } + + private static void verifyMetrics(ThriftMetrics metrics, String name, int expectValue) + throws Exception { + MetricsContext context = MetricsUtil.getContext( + ThriftMetrics.CONTEXT_NAME); + metrics.doUpdates(context); + OutputRecord record = context.getAllRecords().get( + ThriftMetrics.CONTEXT_NAME).iterator().next(); + assertEquals(expectValue, record.getMetric(name).intValue()); + } + public static void createTestTables(Hbase.Iface handler) throws Exception { // Create/enable/disable/delete tables, ensure methods act correctly assertEquals(handler.getTableNames().size(), 0);