HBASE-17627 Active workers metric for thrift (Ashu Pachauri)

Signed-off-by: Gary Helmling <garyh@apache.org>
This commit is contained in:
Ashu Pachauri 2017-02-13 11:27:51 -08:00 committed by Gary Helmling
parent de23d306eb
commit d2c083d21c
7 changed files with 110 additions and 8 deletions

View File

@ -32,6 +32,7 @@ public interface MetricsThriftServerSource extends ExceptionTrackingSource, JvmP
String THRIFT_CALL_KEY = "thriftCall"; String THRIFT_CALL_KEY = "thriftCall";
String SLOW_THRIFT_CALL_KEY = "slowThriftCall"; String SLOW_THRIFT_CALL_KEY = "slowThriftCall";
String CALL_QUEUE_LEN_KEY = "callQueueLen"; String CALL_QUEUE_LEN_KEY = "callQueueLen";
String ACTIVE_WORKER_COUNT_KEY = "numActiveWorkers";
/** /**
* Add how long an operation was in the queue. * Add how long an operation was in the queue.
@ -75,4 +76,14 @@ public interface MetricsThriftServerSource extends ExceptionTrackingSource, JvmP
* @param time Time * @param time Time
*/ */
void incSlowCall(long time); void incSlowCall(long time);
/**
* Increment number of active thrift workers.
*/
void incActiveWorkerCount();
/**
* Decrement number of active thrift workers.
*/
void decActiveWorkerCount();
} }

View File

@ -44,6 +44,8 @@ public class MetricsThriftServerSourceImpl extends ExceptionTrackingSourceImpl i
private MutableGaugeLong callQueueLenGauge; private MutableGaugeLong callQueueLenGauge;
private MutableGaugeLong activeWorkerCountGauge;
// pause monitor metrics // pause monitor metrics
private final MutableFastCounter infoPauseThresholdExceeded; private final MutableFastCounter infoPauseThresholdExceeded;
private final MutableFastCounter warnPauseThresholdExceeded; private final MutableFastCounter warnPauseThresholdExceeded;
@ -74,6 +76,7 @@ public class MetricsThriftServerSourceImpl extends ExceptionTrackingSourceImpl i
thriftCallStat = getMetricsRegistry().newTimeHistogram(THRIFT_CALL_KEY); thriftCallStat = getMetricsRegistry().newTimeHistogram(THRIFT_CALL_KEY);
thriftSlowCallStat = getMetricsRegistry().newTimeHistogram(SLOW_THRIFT_CALL_KEY); thriftSlowCallStat = getMetricsRegistry().newTimeHistogram(SLOW_THRIFT_CALL_KEY);
callQueueLenGauge = getMetricsRegistry().getGauge(CALL_QUEUE_LEN_KEY, 0); callQueueLenGauge = getMetricsRegistry().getGauge(CALL_QUEUE_LEN_KEY, 0);
activeWorkerCountGauge = getMetricsRegistry().getGauge(ACTIVE_WORKER_COUNT_KEY, 0);
} }
@Override @Override
@ -112,6 +115,16 @@ public class MetricsThriftServerSourceImpl extends ExceptionTrackingSourceImpl i
thriftSlowCallStat.add(time); thriftSlowCallStat.add(time);
} }
@Override
public void incActiveWorkerCount() {
activeWorkerCountGauge.incr();
}
@Override
public void decActiveWorkerCount() {
activeWorkerCountGauge.decr();
}
@Override @Override
public void incInfoThresholdExceeded(int count) { public void incInfoThresholdExceeded(int count) {
infoPauseThresholdExceeded.incr(count); infoPauseThresholdExceeded.incr(count);

View File

@ -156,9 +156,9 @@ public class TBoundedThreadPoolServer extends TServer {
tfb.setDaemon(true); tfb.setDaemon(true);
tfb.setNameFormat("thrift-worker-%d"); tfb.setNameFormat("thrift-worker-%d");
executorService = executorService =
new ThreadPoolExecutor(minWorkerThreads, new THBaseThreadPoolExecutor(minWorkerThreads,
maxWorkerThreads, options.threadKeepAliveTimeSec, maxWorkerThreads, options.threadKeepAliveTimeSec,
TimeUnit.SECONDS, this.callQueue, tfb.build()); TimeUnit.SECONDS, this.callQueue, tfb.build(), metrics);
executorService.allowCoreThreadTimeOut(true); executorService.allowCoreThreadTimeOut(true);
serverOptions = options; serverOptions = options;
} }

View File

@ -0,0 +1,61 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.thrift;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* A ThreadPoolExecutor customized for working with HBase thrift to update metrics before and
* after the execution of a task.
*/
public class THBaseThreadPoolExecutor extends ThreadPoolExecutor {
private ThriftMetrics metrics;
public THBaseThreadPoolExecutor(int corePoolSize, int maxPoolSize, long keepAliveTime,
TimeUnit unit, BlockingQueue<Runnable> workQueue, ThriftMetrics metrics) {
this(corePoolSize, maxPoolSize, keepAliveTime, unit, workQueue, null, metrics);
}
public THBaseThreadPoolExecutor(int corePoolSize, int maxPoolSize, long keepAliveTime,
TimeUnit unit, BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,ThriftMetrics metrics) {
super(corePoolSize, maxPoolSize, keepAliveTime, unit, workQueue);
if (threadFactory != null) {
setThreadFactory(threadFactory);
}
this.metrics = metrics;
}
@Override
protected void beforeExecute(Thread t, Runnable r) {
super.beforeExecute(t, r);
metrics.incActiveWorkerCount();
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
metrics.decActiveWorkerCount();
super.afterExecute(r, t);
}
}

View File

@ -99,6 +99,14 @@ public class ThriftMetrics {
} }
} }
public void incActiveWorkerCount() {
source.incActiveWorkerCount();
}
public void decActiveWorkerCount() {
source.decActiveWorkerCount();
}
/** /**
* Increment the count for a specific exception type. This is called for each exception type * Increment the count for a specific exception type. This is called for each exception type
* that is returned to the thrift handler. * that is returned to the thrift handler.

View File

@ -659,8 +659,8 @@ public class ThriftServerRunner implements Runnable {
ThreadFactoryBuilder tfb = new ThreadFactoryBuilder(); ThreadFactoryBuilder tfb = new ThreadFactoryBuilder();
tfb.setDaemon(true); tfb.setDaemon(true);
tfb.setNameFormat("thrift-worker-%d"); tfb.setNameFormat("thrift-worker-%d");
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(minWorkers, maxWorkers, ThreadPoolExecutor threadPool = new THBaseThreadPoolExecutor(minWorkers, maxWorkers,
Long.MAX_VALUE, TimeUnit.SECONDS, callQueue, tfb.build()); Long.MAX_VALUE, TimeUnit.SECONDS, callQueue, tfb.build(), metrics);
threadPool.allowCoreThreadTimeOut(true); threadPool.allowCoreThreadTimeOut(true);
return threadPool; return threadPool;
} }

View File

@ -28,6 +28,7 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -59,6 +60,7 @@ import org.apache.hadoop.hbase.security.SecurityUtil;
import org.apache.hadoop.hbase.security.UserProvider; import org.apache.hadoop.hbase.security.UserProvider;
import org.apache.hadoop.hbase.thrift.CallQueue; import org.apache.hadoop.hbase.thrift.CallQueue;
import org.apache.hadoop.hbase.thrift.CallQueue.Call; import org.apache.hadoop.hbase.thrift.CallQueue.Call;
import org.apache.hadoop.hbase.thrift.THBaseThreadPoolExecutor;
import org.apache.hadoop.hbase.thrift.ThriftMetrics; import org.apache.hadoop.hbase.thrift.ThriftMetrics;
import org.apache.hadoop.hbase.thrift2.generated.THBaseService; import org.apache.hadoop.hbase.thrift2.generated.THBaseService;
import org.apache.hadoop.hbase.util.DNS; import org.apache.hadoop.hbase.util.DNS;
@ -312,8 +314,8 @@ public class ThriftServer extends Configured implements Tool {
ThreadFactoryBuilder tfb = new ThreadFactoryBuilder(); ThreadFactoryBuilder tfb = new ThreadFactoryBuilder();
tfb.setDaemon(true); tfb.setDaemon(true);
tfb.setNameFormat("thrift2-worker-%d"); tfb.setNameFormat("thrift2-worker-%d");
ThreadPoolExecutor pool = new ThreadPoolExecutor(workerThreads, workerThreads, ThreadPoolExecutor pool = new THBaseThreadPoolExecutor(workerThreads, workerThreads,
Long.MAX_VALUE, TimeUnit.SECONDS, callQueue, tfb.build()); Long.MAX_VALUE, TimeUnit.SECONDS, callQueue, tfb.build(), metrics);
pool.prestartAllCoreThreads(); pool.prestartAllCoreThreads();
return pool; return pool;
} }
@ -324,7 +326,8 @@ public class ThriftServer extends Configured implements Tool {
int workerThreads, int workerThreads,
InetSocketAddress inetSocketAddress, InetSocketAddress inetSocketAddress,
int backlog, int backlog,
int clientTimeout) int clientTimeout,
ThriftMetrics metrics)
throws TTransportException { throws TTransportException {
TServerTransport serverTransport = new TServerSocket( TServerTransport serverTransport = new TServerSocket(
new TServerSocket.ServerSocketTransportArgs(). new TServerSocket.ServerSocketTransportArgs().
@ -338,6 +341,11 @@ public class ThriftServer extends Configured implements Tool {
if (workerThreads > 0) { if (workerThreads > 0) {
serverArgs.maxWorkerThreads(workerThreads); serverArgs.maxWorkerThreads(workerThreads);
} }
ThreadPoolExecutor executor = new THBaseThreadPoolExecutor(serverArgs.minWorkerThreads,
serverArgs.maxWorkerThreads, serverArgs.stopTimeoutVal, TimeUnit.SECONDS,
new SynchronousQueue<>(), metrics);
serverArgs.executorService(executor);
return new TThreadPoolServer(serverArgs); return new TThreadPoolServer(serverArgs);
} }
@ -574,7 +582,8 @@ public class ThriftServer extends Configured implements Tool {
workerThreads, workerThreads,
inetSocketAddress, inetSocketAddress,
backlog, backlog,
readTimeout); readTimeout,
metrics);
} }
final TServer tserver = server; final TServer tserver = server;