HBASE-5186 Add metrics to ThriftServer (Scott Chen)
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1239737 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
be722211fd
commit
4b4904a77c
|
@ -50,7 +50,6 @@ import org.apache.hadoop.hbase.thrift.generated.TRowResult;
|
||||||
public class HRegionThriftServer extends Thread {
|
public class HRegionThriftServer extends Thread {
|
||||||
|
|
||||||
public static final Log LOG = LogFactory.getLog(HRegionThriftServer.class);
|
public static final Log LOG = LogFactory.getLog(HRegionThriftServer.class);
|
||||||
public static final int DEFAULT_LISTEN_PORT = 9090;
|
|
||||||
|
|
||||||
private final HRegionServer rs;
|
private final HRegionServer rs;
|
||||||
private final ThriftServerRunner serverRunner;
|
private final ThriftServerRunner serverRunner;
|
||||||
|
|
|
@ -0,0 +1,260 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.hbase.thrift;
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Collection;
|
||||||
|
import java.util.Iterator;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.concurrent.BlockingQueue;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A BlockingQueue reports waiting time in queue and queue length to
|
||||||
|
* ThriftMetrics.
|
||||||
|
*/
|
||||||
|
public class CallQueue implements BlockingQueue<Runnable> {
|
||||||
|
private static Log LOG = LogFactory.getLog(CallQueue.class);
|
||||||
|
|
||||||
|
private final BlockingQueue<Call> underlyingQueue;
|
||||||
|
private final ThriftMetrics metrics;
|
||||||
|
|
||||||
|
public CallQueue(BlockingQueue<Call> underlyingQueue,
|
||||||
|
ThriftMetrics metrics) {
|
||||||
|
this.underlyingQueue = underlyingQueue;
|
||||||
|
this.metrics = metrics;
|
||||||
|
}
|
||||||
|
|
||||||
|
private static long now() {
|
||||||
|
return System.nanoTime();
|
||||||
|
}
|
||||||
|
|
||||||
|
public static class Call implements Runnable {
|
||||||
|
final long startTime;
|
||||||
|
final Runnable underlyingRunnable;
|
||||||
|
|
||||||
|
Call(Runnable underlyingRunnable) {
|
||||||
|
this.underlyingRunnable = underlyingRunnable;
|
||||||
|
this.startTime = now();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
underlyingRunnable.run();
|
||||||
|
}
|
||||||
|
|
||||||
|
public long timeInQueue() {
|
||||||
|
return now() - startTime;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean equals(Object other) {
|
||||||
|
if (other instanceof Call) {
|
||||||
|
Call otherCall = (Call)(other);
|
||||||
|
return this.underlyingRunnable.equals(otherCall.underlyingRunnable);
|
||||||
|
} else if (other instanceof Runnable) {
|
||||||
|
return this.underlyingRunnable.equals(other);
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int hashCode() {
|
||||||
|
return this.underlyingRunnable.hashCode();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Runnable poll() {
|
||||||
|
Call result = underlyingQueue.poll();
|
||||||
|
updateMetrics(result);
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void updateMetrics(Call result) {
|
||||||
|
if (result == null) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
metrics.incTimeInQueue(result.timeInQueue());
|
||||||
|
metrics.setCallQueueLen(this.size());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Runnable poll(long timeout, TimeUnit unit) throws InterruptedException {
|
||||||
|
Call result = underlyingQueue.poll(timeout, unit);
|
||||||
|
updateMetrics(result);
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Runnable remove() {
|
||||||
|
Call result = underlyingQueue.remove();
|
||||||
|
updateMetrics(result);
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Runnable take() throws InterruptedException {
|
||||||
|
Call result = underlyingQueue.take();
|
||||||
|
updateMetrics(result);
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int drainTo(Collection<? super Runnable> destination) {
|
||||||
|
return drainTo(destination, Integer.MAX_VALUE);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int drainTo(Collection<? super Runnable> destination,
|
||||||
|
int maxElements) {
|
||||||
|
if (destination == this) {
|
||||||
|
throw new IllegalArgumentException(
|
||||||
|
"A BlockingQueue cannot drain to itself.");
|
||||||
|
}
|
||||||
|
List<Call> drained = new ArrayList<Call>();
|
||||||
|
underlyingQueue.drainTo(drained, maxElements);
|
||||||
|
for (Call r : drained) {
|
||||||
|
updateMetrics(r);
|
||||||
|
}
|
||||||
|
destination.addAll(drained);
|
||||||
|
int sz = drained.size();
|
||||||
|
LOG.info("Elements drained: " + sz);
|
||||||
|
return sz;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean offer(Runnable element) {
|
||||||
|
return underlyingQueue.offer(new Call(element));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean offer(Runnable element, long timeout, TimeUnit unit)
|
||||||
|
throws InterruptedException {
|
||||||
|
return underlyingQueue.offer(new Call(element), timeout, unit);
|
||||||
|
}
|
||||||
|
@Override
|
||||||
|
public void put(Runnable element) throws InterruptedException {
|
||||||
|
underlyingQueue.put(new Call(element));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean add(Runnable element) {
|
||||||
|
return underlyingQueue.add(new Call(element));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean addAll(Collection<? extends Runnable> elements) {
|
||||||
|
int added = 0;
|
||||||
|
for (Runnable r : elements) {
|
||||||
|
added += underlyingQueue.add(new Call(r)) ? 1 : 0;
|
||||||
|
}
|
||||||
|
return added != 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Runnable element() {
|
||||||
|
return underlyingQueue.element();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Runnable peek() {
|
||||||
|
return underlyingQueue.peek();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void clear() {
|
||||||
|
underlyingQueue.clear();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean containsAll(Collection<?> elements) {
|
||||||
|
return underlyingQueue.containsAll(elements);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean isEmpty() {
|
||||||
|
return underlyingQueue.isEmpty();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Iterator<Runnable> iterator() {
|
||||||
|
return new Iterator<Runnable>() {
|
||||||
|
final Iterator<Call> underlyingIterator = underlyingQueue.iterator();
|
||||||
|
@Override
|
||||||
|
public Runnable next() {
|
||||||
|
return underlyingIterator.next();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean hasNext() {
|
||||||
|
return underlyingIterator.hasNext();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void remove() {
|
||||||
|
underlyingIterator.remove();
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean removeAll(Collection<?> elements) {
|
||||||
|
return underlyingQueue.removeAll(elements);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean retainAll(Collection<?> elements) {
|
||||||
|
return underlyingQueue.retainAll(elements);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int size() {
|
||||||
|
return underlyingQueue.size();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Object[] toArray() {
|
||||||
|
return underlyingQueue.toArray();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public <T> T[] toArray(T[] array) {
|
||||||
|
return underlyingQueue.toArray(array);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean contains(Object element) {
|
||||||
|
return underlyingQueue.contains(element);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int remainingCapacity() {
|
||||||
|
return underlyingQueue.remainingCapacity();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean remove(Object element) {
|
||||||
|
return underlyingQueue.remove(element);
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,80 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.hbase.thrift;
|
||||||
|
|
||||||
|
import java.lang.reflect.InvocationHandler;
|
||||||
|
import java.lang.reflect.InvocationTargetException;
|
||||||
|
import java.lang.reflect.Method;
|
||||||
|
import java.lang.reflect.Proxy;
|
||||||
|
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.hbase.thrift.generated.Hbase;
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Converts a Hbase.Iface using InvocationHandler so that it reports process
|
||||||
|
* time of each call to ThriftMetrics.
|
||||||
|
*/
|
||||||
|
public class HbaseHandlerMetricsProxy implements InvocationHandler {
|
||||||
|
|
||||||
|
public static final Log LOG = LogFactory.getLog(
|
||||||
|
HbaseHandlerMetricsProxy.class);
|
||||||
|
|
||||||
|
private final Hbase.Iface handler;
|
||||||
|
private final ThriftMetrics metrics;
|
||||||
|
|
||||||
|
public static Hbase.Iface newInstance(Hbase.Iface handler,
|
||||||
|
ThriftMetrics metrics,
|
||||||
|
Configuration conf) {
|
||||||
|
return (Hbase.Iface) Proxy.newProxyInstance(
|
||||||
|
handler.getClass().getClassLoader(),
|
||||||
|
handler.getClass().getInterfaces(),
|
||||||
|
new HbaseHandlerMetricsProxy(handler, metrics, conf));
|
||||||
|
}
|
||||||
|
|
||||||
|
private HbaseHandlerMetricsProxy(
|
||||||
|
Hbase.Iface handler, ThriftMetrics metrics, Configuration conf) {
|
||||||
|
this.handler = handler;
|
||||||
|
this.metrics = metrics;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Object invoke(Object proxy, Method m, Object[] args)
|
||||||
|
throws Throwable {
|
||||||
|
Object result;
|
||||||
|
try {
|
||||||
|
long start = now();
|
||||||
|
result = m.invoke(handler, args);
|
||||||
|
int processTime = (int)(now() - start);
|
||||||
|
metrics.incMethodTime(m.getName(), processTime);
|
||||||
|
} catch (InvocationTargetException e) {
|
||||||
|
throw e.getTargetException();
|
||||||
|
} catch (Exception e) {
|
||||||
|
throw new RuntimeException(
|
||||||
|
"unexpected invocation exception: " + e.getMessage());
|
||||||
|
}
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
private static long now() {
|
||||||
|
return System.nanoTime();
|
||||||
|
}
|
||||||
|
}
|
|
@ -18,7 +18,6 @@
|
||||||
|
|
||||||
package org.apache.hadoop.hbase.thrift;
|
package org.apache.hadoop.hbase.thrift;
|
||||||
|
|
||||||
import java.util.concurrent.BlockingQueue;
|
|
||||||
import java.util.concurrent.ExecutorService;
|
import java.util.concurrent.ExecutorService;
|
||||||
import java.util.concurrent.LinkedBlockingQueue;
|
import java.util.concurrent.LinkedBlockingQueue;
|
||||||
import java.util.concurrent.RejectedExecutionException;
|
import java.util.concurrent.RejectedExecutionException;
|
||||||
|
@ -29,6 +28,7 @@ import java.util.concurrent.TimeUnit;
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.hbase.thrift.CallQueue.Call;
|
||||||
import org.apache.hadoop.hbase.util.Threads;
|
import org.apache.hadoop.hbase.util.Threads;
|
||||||
import org.apache.thrift.TException;
|
import org.apache.thrift.TException;
|
||||||
import org.apache.thrift.TProcessor;
|
import org.apache.thrift.TProcessor;
|
||||||
|
@ -103,6 +103,8 @@ public class TBoundedThreadPoolServer extends TServer {
|
||||||
private static final Log LOG = LogFactory.getLog(
|
private static final Log LOG = LogFactory.getLog(
|
||||||
TBoundedThreadPoolServer.class.getName());
|
TBoundedThreadPoolServer.class.getName());
|
||||||
|
|
||||||
|
private final CallQueue callQueue;
|
||||||
|
|
||||||
public static class Args extends TThreadPoolServer.Args {
|
public static class Args extends TThreadPoolServer.Args {
|
||||||
int maxQueuedRequests;
|
int maxQueuedRequests;
|
||||||
int threadKeepAliveTimeSec;
|
int threadKeepAliveTimeSec;
|
||||||
|
@ -135,15 +137,14 @@ public class TBoundedThreadPoolServer extends TServer {
|
||||||
|
|
||||||
private Args serverOptions;
|
private Args serverOptions;
|
||||||
|
|
||||||
public TBoundedThreadPoolServer(Args options) {
|
public TBoundedThreadPoolServer(Args options, ThriftMetrics metrics) {
|
||||||
super(options);
|
super(options);
|
||||||
|
|
||||||
BlockingQueue<Runnable> executorQueue;
|
|
||||||
if (options.maxQueuedRequests > 0) {
|
if (options.maxQueuedRequests > 0) {
|
||||||
executorQueue = new LinkedBlockingQueue<Runnable>(
|
this.callQueue = new CallQueue(
|
||||||
options.maxQueuedRequests);
|
new LinkedBlockingQueue<Call>(options.maxQueuedRequests), metrics);
|
||||||
} else {
|
} else {
|
||||||
executorQueue = new SynchronousQueue<Runnable>();
|
this.callQueue = new CallQueue(new SynchronousQueue<Call>(), metrics);
|
||||||
}
|
}
|
||||||
|
|
||||||
ThreadFactoryBuilder tfb = new ThreadFactoryBuilder();
|
ThreadFactoryBuilder tfb = new ThreadFactoryBuilder();
|
||||||
|
@ -151,11 +152,18 @@ public class TBoundedThreadPoolServer extends TServer {
|
||||||
tfb.setNameFormat("thrift-worker-%d");
|
tfb.setNameFormat("thrift-worker-%d");
|
||||||
executorService =
|
executorService =
|
||||||
new ThreadPoolExecutor(options.minWorkerThreads,
|
new ThreadPoolExecutor(options.minWorkerThreads,
|
||||||
options.maxWorkerThreads, options.threadKeepAliveTimeSec, TimeUnit.SECONDS,
|
options.maxWorkerThreads, options.threadKeepAliveTimeSec,
|
||||||
executorQueue, tfb.build());
|
TimeUnit.SECONDS, this.callQueue, tfb.build());
|
||||||
serverOptions = options;
|
serverOptions = options;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Return the server working queue
|
||||||
|
*/
|
||||||
|
public CallQueue getCallQueue() {
|
||||||
|
return this.callQueue;
|
||||||
|
}
|
||||||
|
|
||||||
public void serve() {
|
public void serve() {
|
||||||
try {
|
try {
|
||||||
serverTransport_.listen();
|
serverTransport_.listen();
|
||||||
|
|
|
@ -0,0 +1,131 @@
|
||||||
|
/*
|
||||||
|
* Copyright The Apache Software Foundation
|
||||||
|
*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with this
|
||||||
|
* work for additional information regarding copyright ownership. The ASF
|
||||||
|
* licenses this file to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
* License for the specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.hbase.thrift;
|
||||||
|
|
||||||
|
import java.lang.reflect.Method;
|
||||||
|
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.hbase.thrift.generated.Hbase;
|
||||||
|
import org.apache.hadoop.metrics.MetricsContext;
|
||||||
|
import org.apache.hadoop.metrics.MetricsRecord;
|
||||||
|
import org.apache.hadoop.metrics.MetricsUtil;
|
||||||
|
import org.apache.hadoop.metrics.Updater;
|
||||||
|
import org.apache.hadoop.metrics.util.MetricsBase;
|
||||||
|
import org.apache.hadoop.metrics.util.MetricsIntValue;
|
||||||
|
import org.apache.hadoop.metrics.util.MetricsRegistry;
|
||||||
|
import org.apache.hadoop.metrics.util.MetricsTimeVaryingInt;
|
||||||
|
import org.apache.hadoop.metrics.util.MetricsTimeVaryingLong;
|
||||||
|
import org.apache.hadoop.metrics.util.MetricsTimeVaryingRate;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This class is for maintaining the various statistics of thrift server
|
||||||
|
* and publishing them through the metrics interfaces.
|
||||||
|
*/
|
||||||
|
public class ThriftMetrics implements Updater {
|
||||||
|
public final static Log LOG = LogFactory.getLog(ThriftMetrics.class);
|
||||||
|
public final static String CONTEXT_NAME = "thriftserver";
|
||||||
|
|
||||||
|
private final MetricsContext context;
|
||||||
|
private final MetricsRecord metricsRecord;
|
||||||
|
private final MetricsRegistry registry = new MetricsRegistry();
|
||||||
|
private final long slowResponseTime;
|
||||||
|
public static final String SLOW_RESPONSE_NANO_SEC =
|
||||||
|
"hbase.thrift.slow.response.nano.second";
|
||||||
|
public static final long DEFAULT_SLOW_RESPONSE_NANO_SEC = 10 * 1000 * 1000;
|
||||||
|
|
||||||
|
private final MetricsIntValue callQueueLen =
|
||||||
|
new MetricsIntValue("callQueueLen", registry);
|
||||||
|
private final MetricsTimeVaryingRate timeInQueue =
|
||||||
|
new MetricsTimeVaryingRate("timeInQueue", registry);
|
||||||
|
private MetricsTimeVaryingRate thriftCall =
|
||||||
|
new MetricsTimeVaryingRate("thriftCall", registry);
|
||||||
|
private MetricsTimeVaryingRate slowThriftCall =
|
||||||
|
new MetricsTimeVaryingRate("slowThriftCall", registry);
|
||||||
|
|
||||||
|
public ThriftMetrics(int port, Configuration conf) {
|
||||||
|
slowResponseTime = conf.getLong(
|
||||||
|
SLOW_RESPONSE_NANO_SEC, DEFAULT_SLOW_RESPONSE_NANO_SEC);
|
||||||
|
context = MetricsUtil.getContext(CONTEXT_NAME);
|
||||||
|
metricsRecord = MetricsUtil.createRecord(context, CONTEXT_NAME);
|
||||||
|
|
||||||
|
metricsRecord.setTag("port", port + "");
|
||||||
|
|
||||||
|
LOG.info("Initializing RPC Metrics with port=" + port);
|
||||||
|
|
||||||
|
context.registerUpdater(this);
|
||||||
|
|
||||||
|
createMetricsForMethods(Hbase.Iface.class);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void incTimeInQueue(long time) {
|
||||||
|
timeInQueue.inc(time);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setCallQueueLen(int len) {
|
||||||
|
callQueueLen.set(len);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void incMethodTime(String name, int time) {
|
||||||
|
MetricsTimeVaryingRate methodTimeMetrc = getMethodTimeMetrics(name);
|
||||||
|
if (methodTimeMetrc == null) {
|
||||||
|
LOG.warn(
|
||||||
|
"Got incMethodTime() request for method that doesnt exist: " + name);
|
||||||
|
return; // ignore methods that dont exist.
|
||||||
|
}
|
||||||
|
|
||||||
|
// inc method specific processTime
|
||||||
|
methodTimeMetrc.inc(time);
|
||||||
|
|
||||||
|
// inc general processTime
|
||||||
|
thriftCall.inc(time);
|
||||||
|
if (time > slowResponseTime) {
|
||||||
|
slowThriftCall.inc(time);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void createMetricsForMethods(Class<?> iface) {
|
||||||
|
for (Method m : iface.getDeclaredMethods()) {
|
||||||
|
if (getMethodTimeMetrics(m.getName()) == null)
|
||||||
|
LOG.debug("Creating metrics for method:" + m.getName());
|
||||||
|
createMethodTimeMetrics(m.getName());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private MetricsTimeVaryingRate getMethodTimeMetrics(String key) {
|
||||||
|
return (MetricsTimeVaryingRate) registry.get(key);
|
||||||
|
}
|
||||||
|
|
||||||
|
private MetricsTimeVaryingRate createMethodTimeMetrics(String key) {
|
||||||
|
return new MetricsTimeVaryingRate(key, this.registry);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Push the metrics to the monitoring subsystem on doUpdate() call.
|
||||||
|
*/
|
||||||
|
public void doUpdates(final MetricsContext context) {
|
||||||
|
// getMetricsList() and pushMetric() are thread safe methods
|
||||||
|
for (MetricsBase m : registry.getMetricsList()) {
|
||||||
|
m.pushMetric(metricsRecord);
|
||||||
|
}
|
||||||
|
metricsRecord.update();
|
||||||
|
}
|
||||||
|
}
|
|
@ -31,6 +31,11 @@ import java.util.HashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.TreeMap;
|
import java.util.TreeMap;
|
||||||
|
import java.util.concurrent.BlockingQueue;
|
||||||
|
import java.util.concurrent.ExecutorService;
|
||||||
|
import java.util.concurrent.LinkedBlockingQueue;
|
||||||
|
import java.util.concurrent.ThreadPoolExecutor;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
import org.apache.commons.cli.CommandLine;
|
import org.apache.commons.cli.CommandLine;
|
||||||
import org.apache.commons.cli.Option;
|
import org.apache.commons.cli.Option;
|
||||||
|
@ -57,12 +62,13 @@ import org.apache.hadoop.hbase.filter.ParseFilter;
|
||||||
import org.apache.hadoop.hbase.filter.PrefixFilter;
|
import org.apache.hadoop.hbase.filter.PrefixFilter;
|
||||||
import org.apache.hadoop.hbase.filter.WhileMatchFilter;
|
import org.apache.hadoop.hbase.filter.WhileMatchFilter;
|
||||||
import org.apache.hadoop.hbase.security.User;
|
import org.apache.hadoop.hbase.security.User;
|
||||||
|
import org.apache.hadoop.hbase.thrift.CallQueue.Call;
|
||||||
import org.apache.hadoop.hbase.thrift.generated.AlreadyExists;
|
import org.apache.hadoop.hbase.thrift.generated.AlreadyExists;
|
||||||
import org.apache.hadoop.hbase.thrift.generated.BatchMutation;
|
import org.apache.hadoop.hbase.thrift.generated.BatchMutation;
|
||||||
import org.apache.hadoop.hbase.thrift.generated.ColumnDescriptor;
|
import org.apache.hadoop.hbase.thrift.generated.ColumnDescriptor;
|
||||||
import org.apache.hadoop.hbase.thrift.generated.Hbase;
|
import org.apache.hadoop.hbase.thrift.generated.Hbase;
|
||||||
import org.apache.hadoop.hbase.thrift.generated.IOError;
|
|
||||||
import org.apache.hadoop.hbase.thrift.generated.IllegalArgument;
|
import org.apache.hadoop.hbase.thrift.generated.IllegalArgument;
|
||||||
|
import org.apache.hadoop.hbase.thrift.generated.IOError;
|
||||||
import org.apache.hadoop.hbase.thrift.generated.Mutation;
|
import org.apache.hadoop.hbase.thrift.generated.Mutation;
|
||||||
import org.apache.hadoop.hbase.thrift.generated.TCell;
|
import org.apache.hadoop.hbase.thrift.generated.TCell;
|
||||||
import org.apache.hadoop.hbase.thrift.generated.TRegionInfo;
|
import org.apache.hadoop.hbase.thrift.generated.TRegionInfo;
|
||||||
|
@ -89,6 +95,7 @@ import org.apache.thrift.transport.TServerTransport;
|
||||||
import org.apache.thrift.transport.TTransportFactory;
|
import org.apache.thrift.transport.TTransportFactory;
|
||||||
|
|
||||||
import com.google.common.base.Joiner;
|
import com.google.common.base.Joiner;
|
||||||
|
import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* ThriftServerRunner - this class starts up a Thrift server which implements
|
* ThriftServerRunner - this class starts up a Thrift server which implements
|
||||||
|
@ -107,11 +114,14 @@ public class ThriftServerRunner implements Runnable {
|
||||||
static final String PORT_CONF_KEY = "hbase.regionserver.thrift.port";
|
static final String PORT_CONF_KEY = "hbase.regionserver.thrift.port";
|
||||||
|
|
||||||
private static final String DEFAULT_BIND_ADDR = "0.0.0.0";
|
private static final String DEFAULT_BIND_ADDR = "0.0.0.0";
|
||||||
private static final int DEFAULT_LISTEN_PORT = 9090;
|
public static final int DEFAULT_LISTEN_PORT = 9090;
|
||||||
|
private final int listenPort;
|
||||||
|
|
||||||
private Configuration conf;
|
private Configuration conf;
|
||||||
volatile TServer tserver;
|
volatile TServer tserver;
|
||||||
private final HBaseHandler handler;
|
private final Hbase.Iface handler;
|
||||||
|
private final ThriftMetrics metrics;
|
||||||
|
private CallQueue callQueue;
|
||||||
|
|
||||||
/** An enum of server implementation selections */
|
/** An enum of server implementation selections */
|
||||||
enum ImplType {
|
enum ImplType {
|
||||||
|
@ -214,7 +224,10 @@ public class ThriftServerRunner implements Runnable {
|
||||||
|
|
||||||
public ThriftServerRunner(Configuration conf, HBaseHandler handler) {
|
public ThriftServerRunner(Configuration conf, HBaseHandler handler) {
|
||||||
this.conf = HBaseConfiguration.create(conf);
|
this.conf = HBaseConfiguration.create(conf);
|
||||||
this.handler = handler;
|
this.listenPort = conf.getInt(PORT_CONF_KEY, DEFAULT_LISTEN_PORT);
|
||||||
|
this.metrics = new ThriftMetrics(listenPort, conf);
|
||||||
|
this.handler = HbaseHandlerMetricsProxy.newInstance(handler, metrics, conf);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
@ -243,9 +256,6 @@ public class ThriftServerRunner implements Runnable {
|
||||||
* Setting up the thrift TServer
|
* Setting up the thrift TServer
|
||||||
*/
|
*/
|
||||||
private void setupServer() throws Exception {
|
private void setupServer() throws Exception {
|
||||||
// Get port to bind to
|
|
||||||
int listenPort = conf.getInt(PORT_CONF_KEY, DEFAULT_LISTEN_PORT);
|
|
||||||
|
|
||||||
// Construct correct ProtocolFactory
|
// Construct correct ProtocolFactory
|
||||||
TProtocolFactory protocolFactory;
|
TProtocolFactory protocolFactory;
|
||||||
if (conf.getBoolean(COMPACT_CONF_KEY, false)) {
|
if (conf.getBoolean(COMPACT_CONF_KEY, false)) {
|
||||||
|
@ -293,14 +303,22 @@ public class ThriftServerRunner implements Runnable {
|
||||||
tserver = new TNonblockingServer(serverArgs);
|
tserver = new TNonblockingServer(serverArgs);
|
||||||
} else if (implType == ImplType.HS_HA) {
|
} else if (implType == ImplType.HS_HA) {
|
||||||
THsHaServer.Args serverArgs = new THsHaServer.Args(serverTransport);
|
THsHaServer.Args serverArgs = new THsHaServer.Args(serverTransport);
|
||||||
serverArgs.processor(processor)
|
this.callQueue = new CallQueue(new LinkedBlockingQueue<Call>(), metrics);
|
||||||
|
ExecutorService executorService = createExecutor(
|
||||||
|
this.callQueue, serverArgs.getWorkerThreads());
|
||||||
|
serverArgs.executorService(executorService)
|
||||||
|
.processor(processor)
|
||||||
.transportFactory(transportFactory)
|
.transportFactory(transportFactory)
|
||||||
.protocolFactory(protocolFactory);
|
.protocolFactory(protocolFactory);
|
||||||
tserver = new THsHaServer(serverArgs);
|
tserver = new THsHaServer(serverArgs);
|
||||||
} else { // THREADED_SELECTOR
|
} else { // THREADED_SELECTOR
|
||||||
TThreadedSelectorServer.Args serverArgs =
|
TThreadedSelectorServer.Args serverArgs =
|
||||||
new HThreadedSelectorServerArgs(serverTransport, conf);
|
new HThreadedSelectorServerArgs(serverTransport, conf);
|
||||||
serverArgs.processor(processor)
|
this.callQueue = new CallQueue(new LinkedBlockingQueue<Call>(), metrics);
|
||||||
|
ExecutorService executorService = createExecutor(
|
||||||
|
this.callQueue, serverArgs.getWorkerThreads());
|
||||||
|
serverArgs.executorService(executorService)
|
||||||
|
.processor(processor)
|
||||||
.transportFactory(transportFactory)
|
.transportFactory(transportFactory)
|
||||||
.protocolFactory(protocolFactory);
|
.protocolFactory(protocolFactory);
|
||||||
tserver = new TThreadedSelectorServer(serverArgs);
|
tserver = new TThreadedSelectorServer(serverArgs);
|
||||||
|
@ -322,7 +340,10 @@ public class ThriftServerRunner implements Runnable {
|
||||||
LOG.info("starting " + ImplType.THREAD_POOL.simpleClassName() + " on "
|
LOG.info("starting " + ImplType.THREAD_POOL.simpleClassName() + " on "
|
||||||
+ listenAddress + ":" + Integer.toString(listenPort)
|
+ listenAddress + ":" + Integer.toString(listenPort)
|
||||||
+ "; " + serverArgs);
|
+ "; " + serverArgs);
|
||||||
tserver = new TBoundedThreadPoolServer(serverArgs);
|
TBoundedThreadPoolServer tserver =
|
||||||
|
new TBoundedThreadPoolServer(serverArgs, metrics);
|
||||||
|
this.callQueue = tserver.getCallQueue();
|
||||||
|
this.tserver = tserver;
|
||||||
} else {
|
} else {
|
||||||
throw new AssertionError("Unsupported Thrift server implementation: " +
|
throw new AssertionError("Unsupported Thrift server implementation: " +
|
||||||
implType.simpleClassName());
|
implType.simpleClassName());
|
||||||
|
@ -336,7 +357,6 @@ public class ThriftServerRunner implements Runnable {
|
||||||
}
|
}
|
||||||
|
|
||||||
// login the server principal (if using secure Hadoop)
|
// login the server principal (if using secure Hadoop)
|
||||||
Configuration conf = handler.conf;
|
|
||||||
if (User.isSecurityEnabled() && User.isHBaseSecurityEnabled(conf)) {
|
if (User.isSecurityEnabled() && User.isHBaseSecurityEnabled(conf)) {
|
||||||
String machineName = Strings.domainNamePointerToHostName(
|
String machineName = Strings.domainNamePointerToHostName(
|
||||||
DNS.getDefaultHost(conf.get("hbase.thrift.dns.interface", "default"),
|
DNS.getDefaultHost(conf.get("hbase.thrift.dns.interface", "default"),
|
||||||
|
@ -346,6 +366,15 @@ public class ThriftServerRunner implements Runnable {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
ExecutorService createExecutor(BlockingQueue<Runnable> callQueue,
|
||||||
|
int workerThreads) {
|
||||||
|
ThreadFactoryBuilder tfb = new ThreadFactoryBuilder();
|
||||||
|
tfb.setDaemon(true);
|
||||||
|
tfb.setNameFormat("thrift-worker-%d");
|
||||||
|
return new ThreadPoolExecutor(workerThreads, workerThreads,
|
||||||
|
Long.MAX_VALUE, TimeUnit.SECONDS, callQueue, tfb.build());
|
||||||
|
}
|
||||||
|
|
||||||
private InetAddress getBindAddress(Configuration conf)
|
private InetAddress getBindAddress(Configuration conf)
|
||||||
throws UnknownHostException {
|
throws UnknownHostException {
|
||||||
String bindAddressStr = conf.get(BIND_CONF_KEY, DEFAULT_BIND_ADDR);
|
String bindAddressStr = conf.get(BIND_CONF_KEY, DEFAULT_BIND_ADDR);
|
||||||
|
@ -456,8 +485,7 @@ public class ThriftServerRunner implements Runnable {
|
||||||
this(HBaseConfiguration.create());
|
this(HBaseConfiguration.create());
|
||||||
}
|
}
|
||||||
|
|
||||||
protected HBaseHandler(final Configuration c)
|
protected HBaseHandler(final Configuration c) throws IOException {
|
||||||
throws IOException {
|
|
||||||
this.conf = c;
|
this.conf = c;
|
||||||
admin = new HBaseAdmin(conf);
|
admin = new HBaseAdmin(conf);
|
||||||
scannerMap = new HashMap<Integer, ResultScanner>();
|
scannerMap = new HashMap<Integer, ResultScanner>();
|
||||||
|
|
|
@ -0,0 +1,142 @@
|
||||||
|
/*
|
||||||
|
* Copyright The Apache Software Foundation
|
||||||
|
*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with this
|
||||||
|
* work for additional information regarding copyright ownership. The ASF
|
||||||
|
* licenses this file to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
* License for the specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.thrift;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Collection;
|
||||||
|
import java.util.concurrent.LinkedBlockingQueue;
|
||||||
|
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||||
|
import org.apache.hadoop.hbase.SmallTests;
|
||||||
|
import org.apache.hadoop.hbase.thrift.CallQueue.Call;
|
||||||
|
import org.apache.hadoop.metrics.ContextFactory;
|
||||||
|
import org.apache.hadoop.metrics.MetricsContext;
|
||||||
|
import org.apache.hadoop.metrics.MetricsUtil;
|
||||||
|
import org.apache.hadoop.metrics.spi.NoEmitMetricsContext;
|
||||||
|
import org.apache.hadoop.metrics.spi.OutputRecord;
|
||||||
|
import org.junit.experimental.categories.Category;
|
||||||
|
import org.junit.runner.RunWith;
|
||||||
|
import org.junit.runners.Parameterized;
|
||||||
|
import org.junit.runners.Parameterized.Parameters;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Unit testing for CallQueue, a part of the
|
||||||
|
* org.apache.hadoop.hbase.thrift package.
|
||||||
|
*/
|
||||||
|
@Category(SmallTests.class)
|
||||||
|
@RunWith(Parameterized.class)
|
||||||
|
public class TestCallQueue {
|
||||||
|
|
||||||
|
public static final Log LOG = LogFactory.getLog(TestCallQueue.class);
|
||||||
|
private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
|
||||||
|
|
||||||
|
private int elementsAdded;
|
||||||
|
private int elementsRemoved;
|
||||||
|
|
||||||
|
@Parameters
|
||||||
|
public static Collection<Object[]> getParameters() {
|
||||||
|
Collection<Object[]> parameters = new ArrayList<Object[]>();
|
||||||
|
for (int elementsAdded : new int[] {100, 200, 300}) {
|
||||||
|
for (int elementsRemoved : new int[] {0, 20, 100}) {
|
||||||
|
parameters.add(new Object[]{new Integer(elementsAdded),
|
||||||
|
new Integer(elementsRemoved)});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return parameters;
|
||||||
|
}
|
||||||
|
|
||||||
|
public TestCallQueue(int elementsAdded, int elementsRemoved) {
|
||||||
|
this.elementsAdded = elementsAdded;
|
||||||
|
this.elementsRemoved = elementsRemoved;
|
||||||
|
LOG.debug("elementsAdded:" + elementsAdded +
|
||||||
|
" elementsRemoved:" + elementsRemoved);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(timeout=3000)
|
||||||
|
public void testPutTake() throws Exception {
|
||||||
|
ThriftMetrics metrics = createMetrics();
|
||||||
|
CallQueue callQueue = new CallQueue(
|
||||||
|
new LinkedBlockingQueue<Call>(), metrics);
|
||||||
|
for (int i = 0; i < elementsAdded; ++i) {
|
||||||
|
callQueue.put(createDummyRunnable());
|
||||||
|
}
|
||||||
|
for (int i = 0; i < elementsRemoved; ++i) {
|
||||||
|
callQueue.take();
|
||||||
|
}
|
||||||
|
verifyMetrics(metrics, "timeInQueue_num_ops", elementsRemoved);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(timeout=3000)
|
||||||
|
public void testOfferPoll() throws Exception {
|
||||||
|
ThriftMetrics metrics = createMetrics();
|
||||||
|
CallQueue callQueue = new CallQueue(
|
||||||
|
new LinkedBlockingQueue<Call>(), metrics);
|
||||||
|
for (int i = 0; i < elementsAdded; ++i) {
|
||||||
|
callQueue.offer(createDummyRunnable());
|
||||||
|
}
|
||||||
|
for (int i = 0; i < elementsRemoved; ++i) {
|
||||||
|
callQueue.poll();
|
||||||
|
}
|
||||||
|
verifyMetrics(metrics, "timeInQueue_num_ops", elementsRemoved);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static ThriftMetrics createMetrics() throws Exception {
|
||||||
|
setupMetricsContext();
|
||||||
|
Configuration conf = UTIL.getConfiguration();
|
||||||
|
return new ThriftMetrics(ThriftServerRunner.DEFAULT_LISTEN_PORT, conf);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static void setupMetricsContext() throws Exception {
|
||||||
|
ContextFactory factory = ContextFactory.getFactory();
|
||||||
|
factory.setAttribute(ThriftMetrics.CONTEXT_NAME + ".class",
|
||||||
|
NoEmitMetricsContext.class.getName());
|
||||||
|
MetricsUtil.getContext(ThriftMetrics.CONTEXT_NAME)
|
||||||
|
.createRecord(ThriftMetrics.CONTEXT_NAME).remove();
|
||||||
|
}
|
||||||
|
|
||||||
|
private static void verifyMetrics(ThriftMetrics metrics, String name, int expectValue)
|
||||||
|
throws Exception {
|
||||||
|
MetricsContext context = MetricsUtil.getContext(
|
||||||
|
ThriftMetrics.CONTEXT_NAME);
|
||||||
|
metrics.doUpdates(context);
|
||||||
|
OutputRecord record = context.getAllRecords().get(
|
||||||
|
ThriftMetrics.CONTEXT_NAME).iterator().next();
|
||||||
|
assertEquals(expectValue, record.getMetric(name).intValue());
|
||||||
|
}
|
||||||
|
|
||||||
|
private static Runnable createDummyRunnable() {
|
||||||
|
return new Runnable() {
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
@org.junit.Rule
|
||||||
|
public org.apache.hadoop.hbase.ResourceCheckerJUnitRule cu =
|
||||||
|
new org.apache.hadoop.hbase.ResourceCheckerJUnitRule();
|
||||||
|
}
|
||||||
|
|
|
@ -19,14 +19,20 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hbase.thrift;
|
package org.apache.hadoop.hbase.thrift;
|
||||||
|
|
||||||
import static org.junit.Assert.*;
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertFalse;
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
import java.nio.ByteBuffer;
|
import java.nio.ByteBuffer;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Collection;
|
||||||
|
|
||||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
import org.junit.experimental.categories.Category;
|
||||||
import org.apache.hadoop.hbase.MediumTests;
|
import org.junit.Test;
|
||||||
|
import org.junit.BeforeClass;
|
||||||
import org.apache.hadoop.hbase.thrift.generated.BatchMutation;
|
import org.apache.hadoop.hbase.thrift.generated.BatchMutation;
|
||||||
import org.apache.hadoop.hbase.thrift.generated.ColumnDescriptor;
|
import org.apache.hadoop.hbase.thrift.generated.ColumnDescriptor;
|
||||||
import org.apache.hadoop.hbase.thrift.generated.Hbase;
|
import org.apache.hadoop.hbase.thrift.generated.Hbase;
|
||||||
|
@ -34,10 +40,15 @@ import org.apache.hadoop.hbase.thrift.generated.Mutation;
|
||||||
import org.apache.hadoop.hbase.thrift.generated.TCell;
|
import org.apache.hadoop.hbase.thrift.generated.TCell;
|
||||||
import org.apache.hadoop.hbase.thrift.generated.TRowResult;
|
import org.apache.hadoop.hbase.thrift.generated.TRowResult;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
import org.apache.hadoop.metrics.ContextFactory;
|
||||||
|
import org.apache.hadoop.metrics.MetricsContext;
|
||||||
|
import org.apache.hadoop.metrics.MetricsUtil;
|
||||||
|
import org.apache.hadoop.metrics.spi.NoEmitMetricsContext;
|
||||||
|
import org.apache.hadoop.metrics.spi.OutputRecord;
|
||||||
import org.junit.AfterClass;
|
import org.junit.AfterClass;
|
||||||
import org.junit.BeforeClass;
|
import org.apache.hadoop.hbase.MediumTests;
|
||||||
import org.junit.Test;
|
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||||
import org.junit.experimental.categories.Category;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Unit testing for ThriftServerRunner.HBaseHandler, a part of the
|
* Unit testing for ThriftServerRunner.HBaseHandler, a part of the
|
||||||
|
@ -82,9 +93,11 @@ public class TestThriftServer {
|
||||||
*
|
*
|
||||||
* @throws Exception
|
* @throws Exception
|
||||||
*/
|
*/
|
||||||
|
@Test
|
||||||
public void testAll() throws Exception {
|
public void testAll() throws Exception {
|
||||||
// Run all tests
|
// Run all tests
|
||||||
doTestTableCreateDrop();
|
doTestTableCreateDrop();
|
||||||
|
doTestThriftMetrics();
|
||||||
doTestTableMutations();
|
doTestTableMutations();
|
||||||
doTestTableTimestampsAndColumns();
|
doTestTableTimestampsAndColumns();
|
||||||
doTestTableScanners();
|
doTestTableScanners();
|
||||||
|
@ -98,7 +111,6 @@ public class TestThriftServer {
|
||||||
*
|
*
|
||||||
* @throws Exception
|
* @throws Exception
|
||||||
*/
|
*/
|
||||||
@Test
|
|
||||||
public void doTestTableCreateDrop() throws Exception {
|
public void doTestTableCreateDrop() throws Exception {
|
||||||
ThriftServerRunner.HBaseHandler handler =
|
ThriftServerRunner.HBaseHandler handler =
|
||||||
new ThriftServerRunner.HBaseHandler(UTIL.getConfiguration());
|
new ThriftServerRunner.HBaseHandler(UTIL.getConfiguration());
|
||||||
|
@ -106,6 +118,50 @@ public class TestThriftServer {
|
||||||
dropTestTables(handler);
|
dropTestTables(handler);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Tests if the metrics for thrift handler work correctly
|
||||||
|
*/
|
||||||
|
public void doTestThriftMetrics() throws Exception {
|
||||||
|
Configuration conf = UTIL.getConfiguration();
|
||||||
|
ThriftMetrics metrics = getMetrics(conf);
|
||||||
|
Hbase.Iface handler = getHandler(metrics, conf);
|
||||||
|
createTestTables(handler);
|
||||||
|
dropTestTables(handler);
|
||||||
|
verifyMetrics(metrics, "createTable_num_ops", 2);
|
||||||
|
verifyMetrics(metrics, "deleteTable_num_ops", 2);
|
||||||
|
verifyMetrics(metrics, "disableTable_num_ops", 2);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static Hbase.Iface getHandler(ThriftMetrics metrics, Configuration conf)
|
||||||
|
throws Exception {
|
||||||
|
Hbase.Iface handler =
|
||||||
|
new ThriftServerRunner.HBaseHandler(conf);
|
||||||
|
return HbaseHandlerMetricsProxy.newInstance(handler, metrics, conf);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static ThriftMetrics getMetrics(Configuration conf) throws Exception {
|
||||||
|
setupMetricsContext();
|
||||||
|
return new ThriftMetrics(ThriftServerRunner.DEFAULT_LISTEN_PORT, conf);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static void setupMetricsContext() throws IOException {
|
||||||
|
ContextFactory factory = ContextFactory.getFactory();
|
||||||
|
factory.setAttribute(ThriftMetrics.CONTEXT_NAME + ".class",
|
||||||
|
NoEmitMetricsContext.class.getName());
|
||||||
|
MetricsUtil.getContext(ThriftMetrics.CONTEXT_NAME)
|
||||||
|
.createRecord(ThriftMetrics.CONTEXT_NAME).remove();
|
||||||
|
}
|
||||||
|
|
||||||
|
private static void verifyMetrics(ThriftMetrics metrics, String name, int expectValue)
|
||||||
|
throws Exception {
|
||||||
|
MetricsContext context = MetricsUtil.getContext(
|
||||||
|
ThriftMetrics.CONTEXT_NAME);
|
||||||
|
metrics.doUpdates(context);
|
||||||
|
OutputRecord record = context.getAllRecords().get(
|
||||||
|
ThriftMetrics.CONTEXT_NAME).iterator().next();
|
||||||
|
assertEquals(expectValue, record.getMetric(name).intValue());
|
||||||
|
}
|
||||||
|
|
||||||
public static void createTestTables(Hbase.Iface handler) throws Exception {
|
public static void createTestTables(Hbase.Iface handler) throws Exception {
|
||||||
// Create/enable/disable/delete tables, ensure methods act correctly
|
// Create/enable/disable/delete tables, ensure methods act correctly
|
||||||
assertEquals(handler.getTableNames().size(), 0);
|
assertEquals(handler.getTableNames().size(), 0);
|
||||||
|
|
Loading…
Reference in New Issue