Revert "HBASE-15967 Metric for active ipc Readers and make default fraction of cpu count"
Revert mistaken commit
This reverts commit 1125215aad
.
This commit is contained in:
parent
2da090f9a3
commit
6d5a25935e
|
@ -86,13 +86,6 @@ public interface MetricsHBaseServerSource extends BaseSource {
|
||||||
String EXCEPTIONS_MULTI_TOO_LARGE_DESC = "A response to a multi request was too large and the " +
|
String EXCEPTIONS_MULTI_TOO_LARGE_DESC = "A response to a multi request was too large and the " +
|
||||||
"rest of the requests will have to be retried.";
|
"rest of the requests will have to be retried.";
|
||||||
|
|
||||||
String RUNNING_READERS = "runningReaders";
|
|
||||||
String RUNNING_READERS_DESCRIPTION =
|
|
||||||
"Count of Reader threads currently busy parsing requests to hand off to the scheduler";
|
|
||||||
|
|
||||||
void incrRunningReaders();
|
|
||||||
void decrRunningReaders();
|
|
||||||
|
|
||||||
void authorizationSuccess();
|
void authorizationSuccess();
|
||||||
|
|
||||||
void authorizationFailure();
|
void authorizationFailure();
|
||||||
|
@ -129,4 +122,6 @@ public interface MetricsHBaseServerSource extends BaseSource {
|
||||||
void processedCall(int processingTime);
|
void processedCall(int processingTime);
|
||||||
|
|
||||||
void queuedAndProcessedCall(int totalTime);
|
void queuedAndProcessedCall(int totalTime);
|
||||||
}
|
|
||||||
|
|
||||||
|
}
|
||||||
|
|
|
@ -57,12 +57,6 @@ public class MetricsHBaseServerSourceImpl extends BaseSourceImpl
|
||||||
private MetricHistogram requestSize;
|
private MetricHistogram requestSize;
|
||||||
private MetricHistogram responseSize;
|
private MetricHistogram responseSize;
|
||||||
|
|
||||||
/**
|
|
||||||
* The count of readers currently working parsing a request as opposed to being blocked on the
|
|
||||||
* selector waiting on requests to come in.
|
|
||||||
*/
|
|
||||||
private final MutableFastCounter runningReaders;
|
|
||||||
|
|
||||||
public MetricsHBaseServerSourceImpl(String metricsName,
|
public MetricsHBaseServerSourceImpl(String metricsName,
|
||||||
String metricsDescription,
|
String metricsDescription,
|
||||||
String metricsContext,
|
String metricsContext,
|
||||||
|
@ -92,9 +86,6 @@ public class MetricsHBaseServerSourceImpl extends BaseSourceImpl
|
||||||
this.exceptionsMultiTooLarge = this.getMetricsRegistry()
|
this.exceptionsMultiTooLarge = this.getMetricsRegistry()
|
||||||
.newCounter(EXCEPTIONS_MULTI_TOO_LARGE_NAME, EXCEPTIONS_MULTI_TOO_LARGE_DESC, 0L);
|
.newCounter(EXCEPTIONS_MULTI_TOO_LARGE_NAME, EXCEPTIONS_MULTI_TOO_LARGE_DESC, 0L);
|
||||||
|
|
||||||
this.runningReaders = this.getMetricsRegistry()
|
|
||||||
.newCounter(RUNNING_READERS, RUNNING_READERS_DESCRIPTION, 0L);
|
|
||||||
|
|
||||||
this.authenticationSuccesses = this.getMetricsRegistry().newCounter(
|
this.authenticationSuccesses = this.getMetricsRegistry().newCounter(
|
||||||
AUTHENTICATION_SUCCESSES_NAME, AUTHENTICATION_SUCCESSES_DESC, 0L);
|
AUTHENTICATION_SUCCESSES_NAME, AUTHENTICATION_SUCCESSES_DESC, 0L);
|
||||||
this.authenticationFailures = this.getMetricsRegistry().newCounter(AUTHENTICATION_FAILURES_NAME,
|
this.authenticationFailures = this.getMetricsRegistry().newCounter(AUTHENTICATION_FAILURES_NAME,
|
||||||
|
@ -117,16 +108,6 @@ public class MetricsHBaseServerSourceImpl extends BaseSourceImpl
|
||||||
RESPONSE_SIZE_DESC);
|
RESPONSE_SIZE_DESC);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public void incrRunningReaders() {
|
|
||||||
this.runningReaders.incr(+1);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void decrRunningReaders() {
|
|
||||||
this.runningReaders.incr(-1);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void authorizationSuccess() {
|
public void authorizationSuccess() {
|
||||||
authorizationSuccesses.incr();
|
authorizationSuccesses.incr();
|
||||||
|
|
|
@ -625,8 +625,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
|
||||||
|
|
||||||
public Listener(final String name) throws IOException {
|
public Listener(final String name) throws IOException {
|
||||||
super(name);
|
super(name);
|
||||||
// The backlog of requests that we will have the serversocket carry. It is not enough
|
// The backlog of requests that we will have the serversocket carry.
|
||||||
// just setting this config. You need to set the backlog in the kernel too.
|
|
||||||
int backlogLength = conf.getInt("hbase.ipc.server.listen.queue.size", 128);
|
int backlogLength = conf.getInt("hbase.ipc.server.listen.queue.size", 128);
|
||||||
// Create a new server socket and set to non blocking mode
|
// Create a new server socket and set to non blocking mode
|
||||||
acceptChannel = ServerSocketChannel.open();
|
acceptChannel = ServerSocketChannel.open();
|
||||||
|
@ -691,12 +690,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
|
||||||
iter.remove();
|
iter.remove();
|
||||||
if (key.isValid()) {
|
if (key.isValid()) {
|
||||||
if (key.isReadable()) {
|
if (key.isReadable()) {
|
||||||
metrics.getMetricsSource().incrRunningReaders();
|
doRead(key);
|
||||||
try {
|
|
||||||
doRead(key);
|
|
||||||
} finally {
|
|
||||||
metrics.getMetricsSource().decrRunningReaders();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
key = null;
|
key = null;
|
||||||
|
@ -740,9 +734,8 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
|
||||||
iter.remove();
|
iter.remove();
|
||||||
try {
|
try {
|
||||||
if (key.isValid()) {
|
if (key.isValid()) {
|
||||||
if (key.isAcceptable()) {
|
if (key.isAcceptable())
|
||||||
doAccept(key);
|
doAccept(key);
|
||||||
}
|
|
||||||
}
|
}
|
||||||
} catch (IOException ignored) {
|
} catch (IOException ignored) {
|
||||||
if (LOG.isTraceEnabled()) LOG.trace("ignored", ignored);
|
if (LOG.isTraceEnabled()) LOG.trace("ignored", ignored);
|
||||||
|
@ -837,8 +830,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
|
||||||
try {
|
try {
|
||||||
count = c.readAndProcess();
|
count = c.readAndProcess();
|
||||||
} catch (InterruptedException ieo) {
|
} catch (InterruptedException ieo) {
|
||||||
LOG.info(Thread.currentThread().getName() +
|
LOG.info(Thread.currentThread().getName() + ": readAndProcess caught InterruptedException", ieo);
|
||||||
": readAndProcess caught InterruptedException", ieo);
|
|
||||||
throw ieo;
|
throw ieo;
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
|
@ -1167,7 +1159,6 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
|
||||||
private ByteBuffer dataLengthBuffer;
|
private ByteBuffer dataLengthBuffer;
|
||||||
protected final ConcurrentLinkedDeque<Call> responseQueue = new ConcurrentLinkedDeque<Call>();
|
protected final ConcurrentLinkedDeque<Call> responseQueue = new ConcurrentLinkedDeque<Call>();
|
||||||
private final Lock responseWriteLock = new ReentrantLock();
|
private final Lock responseWriteLock = new ReentrantLock();
|
||||||
// EXPENSIVE: Counters cost lots of CPU. Remove. Used just to see if idle or not. Use boolean.
|
|
||||||
private Counter rpcCount = new Counter(); // number of outstanding rpcs
|
private Counter rpcCount = new Counter(); // number of outstanding rpcs
|
||||||
private long lastContact;
|
private long lastContact;
|
||||||
private InetAddress addr;
|
private InetAddress addr;
|
||||||
|
@ -2009,11 +2000,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
|
||||||
// See declaration above for documentation on what this size is.
|
// See declaration above for documentation on what this size is.
|
||||||
this.maxQueueSizeInBytes =
|
this.maxQueueSizeInBytes =
|
||||||
this.conf.getLong("hbase.ipc.server.max.callqueue.size", DEFAULT_MAX_CALLQUEUE_SIZE);
|
this.conf.getLong("hbase.ipc.server.max.callqueue.size", DEFAULT_MAX_CALLQUEUE_SIZE);
|
||||||
// Have the Reader thread count default to 1/4 of the processors. This seems to do pretty
|
this.readThreads = conf.getInt("hbase.ipc.server.read.threadpool.size", 10);
|
||||||
// well. See the metric hbase.regionserver.ipc.runningReaders to see if you need to change it.
|
|
||||||
int processors = Runtime.getRuntime().availableProcessors();
|
|
||||||
this.readThreads = conf.getInt("hbase.ipc.server.read.threadpool.size",
|
|
||||||
Math.max(8, processors/ 4));
|
|
||||||
this.purgeTimeout = conf.getLong("hbase.ipc.client.call.purge.timeout",
|
this.purgeTimeout = conf.getLong("hbase.ipc.client.call.purge.timeout",
|
||||||
2 * HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
|
2 * HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
|
||||||
this.warnResponseTime = conf.getInt(WARN_RESPONSE_TIME, DEFAULT_WARN_RESPONSE_TIME);
|
this.warnResponseTime = conf.getInt(WARN_RESPONSE_TIME, DEFAULT_WARN_RESPONSE_TIME);
|
||||||
|
|
Loading…
Reference in New Issue