HBASE-23185 Fix high cpu usage because getTable()#put() gets config value every time [Take2] (#748)
* HBASE-23185 Fix test failure by HBASE-23185 changes
* HBASE-23185 Fix high cpu usage because getTable()#put() gets config value every time
This reverts commit db2ce23a93
.
This commit is contained in:
parent
577db5d7e5
commit
3c7c1b5489
|
@ -323,35 +323,60 @@ class AsyncProcess {
|
|||
|
||||
this.id = COUNTER.incrementAndGet();
|
||||
|
||||
this.pause = conf.getLong(HConstants.HBASE_CLIENT_PAUSE,
|
||||
HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
|
||||
long configuredPauseForCQTBE = conf.getLong(HConstants.HBASE_CLIENT_PAUSE_FOR_CQTBE, pause);
|
||||
if (configuredPauseForCQTBE < pause) {
|
||||
LOG.warn("The " + HConstants.HBASE_CLIENT_PAUSE_FOR_CQTBE + " setting: "
|
||||
+ configuredPauseForCQTBE + " is smaller than " + HConstants.HBASE_CLIENT_PAUSE
|
||||
+ ", will use " + pause + " instead.");
|
||||
this.pauseForCQTBE = pause;
|
||||
} else {
|
||||
this.pauseForCQTBE = configuredPauseForCQTBE;
|
||||
ConnectionConfiguration connConf =
|
||||
hc.getConfiguration() == conf
|
||||
? hc.getConnectionConfiguration()
|
||||
// Slow: parse conf in ConnectionConfiguration constructor
|
||||
: new ConnectionConfiguration(conf);
|
||||
if (connConf == null) {
|
||||
// Slow: parse conf in ConnectionConfiguration constructor
|
||||
connConf = new ConnectionConfiguration(conf);
|
||||
}
|
||||
this.numTries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
|
||||
HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
|
||||
this.rpcTimeout = rpcTimeout;
|
||||
this.operationTimeout = conf.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
|
||||
HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT);
|
||||
this.primaryCallTimeoutMicroseconds = conf.getInt(PRIMARY_CALL_TIMEOUT_KEY, 10000);
|
||||
|
||||
this.maxTotalConcurrentTasks = conf.getInt(HConstants.HBASE_CLIENT_MAX_TOTAL_TASKS,
|
||||
HConstants.DEFAULT_HBASE_CLIENT_MAX_TOTAL_TASKS);
|
||||
this.maxConcurrentTasksPerServer = conf.getInt(HConstants.HBASE_CLIENT_MAX_PERSERVER_TASKS,
|
||||
HConstants.DEFAULT_HBASE_CLIENT_MAX_PERSERVER_TASKS);
|
||||
this.maxConcurrentTasksPerRegion = conf.getInt(HConstants.HBASE_CLIENT_MAX_PERREGION_TASKS,
|
||||
HConstants.DEFAULT_HBASE_CLIENT_MAX_PERREGION_TASKS);
|
||||
this.maxHeapSizePerRequest = conf.getLong(HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE,
|
||||
DEFAULT_HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE);
|
||||
this.maxHeapSizeSubmit = conf.getLong(HBASE_CLIENT_MAX_SUBMIT_HEAPSIZE, DEFAULT_HBASE_CLIENT_MAX_SUBMIT_HEAPSIZE);
|
||||
this.pause = connConf.getPause();
|
||||
this.pauseForCQTBE = connConf.getPauseForCQTBE();
|
||||
|
||||
this.numTries = connConf.getRetriesNumber();
|
||||
this.rpcTimeout = rpcTimeout;
|
||||
this.operationTimeout = connConf.getOperationTimeout();
|
||||
|
||||
// Parse config once and reuse config values of hc's AsyncProcess in AsyncProcess for put
|
||||
// Can be null when constructing hc's AsyncProcess or it's not reusable
|
||||
AsyncProcess globalAsyncProcess = hc.getConfiguration() == conf ? hc.getAsyncProcess() : null;
|
||||
|
||||
this.primaryCallTimeoutMicroseconds =
|
||||
globalAsyncProcess == null
|
||||
? conf.getInt(PRIMARY_CALL_TIMEOUT_KEY, 10000)
|
||||
: globalAsyncProcess.primaryCallTimeoutMicroseconds;
|
||||
|
||||
this.maxTotalConcurrentTasks =
|
||||
globalAsyncProcess == null
|
||||
? conf.getInt(HConstants.HBASE_CLIENT_MAX_TOTAL_TASKS,
|
||||
HConstants.DEFAULT_HBASE_CLIENT_MAX_TOTAL_TASKS)
|
||||
: globalAsyncProcess.maxTotalConcurrentTasks;
|
||||
this.maxConcurrentTasksPerServer =
|
||||
globalAsyncProcess == null
|
||||
? conf.getInt(HConstants.HBASE_CLIENT_MAX_PERSERVER_TASKS,
|
||||
HConstants.DEFAULT_HBASE_CLIENT_MAX_PERSERVER_TASKS)
|
||||
: globalAsyncProcess.maxConcurrentTasksPerServer;
|
||||
this.maxConcurrentTasksPerRegion =
|
||||
globalAsyncProcess == null
|
||||
? conf.getInt(HConstants.HBASE_CLIENT_MAX_PERREGION_TASKS,
|
||||
HConstants.DEFAULT_HBASE_CLIENT_MAX_PERREGION_TASKS)
|
||||
: globalAsyncProcess.maxConcurrentTasksPerRegion;
|
||||
this.maxHeapSizePerRequest =
|
||||
globalAsyncProcess == null
|
||||
? conf.getLong(HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE,
|
||||
DEFAULT_HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE)
|
||||
: globalAsyncProcess.maxHeapSizePerRequest;
|
||||
this.maxHeapSizeSubmit =
|
||||
globalAsyncProcess == null
|
||||
? conf.getLong(HBASE_CLIENT_MAX_SUBMIT_HEAPSIZE, DEFAULT_HBASE_CLIENT_MAX_SUBMIT_HEAPSIZE)
|
||||
: globalAsyncProcess.maxHeapSizeSubmit;
|
||||
this.startLogErrorsCnt =
|
||||
conf.getInt(START_LOG_ERRORS_AFTER_COUNT_KEY, DEFAULT_START_LOG_ERRORS_AFTER_COUNT);
|
||||
globalAsyncProcess == null
|
||||
? conf.getInt(START_LOG_ERRORS_AFTER_COUNT_KEY, DEFAULT_START_LOG_ERRORS_AFTER_COUNT)
|
||||
: globalAsyncProcess.startLogErrorsCnt;
|
||||
|
||||
if (this.maxTotalConcurrentTasks <= 0) {
|
||||
throw new IllegalArgumentException("maxTotalConcurrentTasks=" + maxTotalConcurrentTasks);
|
||||
|
@ -387,11 +412,16 @@ class AsyncProcess {
|
|||
|
||||
this.rpcCallerFactory = rpcCaller;
|
||||
this.rpcFactory = rpcFactory;
|
||||
this.logBatchErrorDetails = conf.getBoolean(LOG_DETAILS_FOR_BATCH_ERROR, false);
|
||||
this.logBatchErrorDetails =
|
||||
globalAsyncProcess == null
|
||||
? conf.getBoolean(LOG_DETAILS_FOR_BATCH_ERROR, false)
|
||||
: globalAsyncProcess.logBatchErrorDetails;
|
||||
|
||||
this.thresholdToLogUndoneTaskDetails =
|
||||
conf.getInt(THRESHOLD_TO_LOG_UNDONE_TASK_DETAILS,
|
||||
DEFAULT_THRESHOLD_TO_LOG_UNDONE_TASK_DETAILS);
|
||||
globalAsyncProcess == null
|
||||
? conf.getInt(THRESHOLD_TO_LOG_UNDONE_TASK_DETAILS,
|
||||
DEFAULT_THRESHOLD_TO_LOG_UNDONE_TASK_DETAILS)
|
||||
: globalAsyncProcess.thresholdToLogUndoneTaskDetails;
|
||||
}
|
||||
|
||||
public void setRpcTimeout(int rpcTimeout) {
|
||||
|
|
|
@ -34,7 +34,6 @@ import java.util.concurrent.atomic.AtomicLong;
|
|||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.HConstants; // Needed for write rpc timeout
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceStability;
|
||||
|
@ -112,32 +111,32 @@ public class BufferedMutatorImpl implements BufferedMutator {
|
|||
this.pool = params.getPool();
|
||||
this.listener = params.getListener();
|
||||
|
||||
ConnectionConfiguration tableConf = new ConnectionConfiguration(conf);
|
||||
ConnectionConfiguration connConf = conn.getConnectionConfiguration();
|
||||
if (connConf == null) {
|
||||
// Slow: parse conf in ConnectionConfiguration constructor
|
||||
connConf = new ConnectionConfiguration(conf);
|
||||
}
|
||||
this.writeBufferSize = params.getWriteBufferSize() != BufferedMutatorParams.UNSET ?
|
||||
params.getWriteBufferSize() : tableConf.getWriteBufferSize();
|
||||
params.getWriteBufferSize() : connConf.getWriteBufferSize();
|
||||
|
||||
// Set via the setter because it does value validation and starts/stops the TimerTask
|
||||
long newWriteBufferPeriodicFlushTimeoutMs =
|
||||
params.getWriteBufferPeriodicFlushTimeoutMs() != UNSET
|
||||
? params.getWriteBufferPeriodicFlushTimeoutMs()
|
||||
: tableConf.getWriteBufferPeriodicFlushTimeoutMs();
|
||||
: connConf.getWriteBufferPeriodicFlushTimeoutMs();
|
||||
long newWriteBufferPeriodicFlushTimerTickMs =
|
||||
params.getWriteBufferPeriodicFlushTimerTickMs() != UNSET
|
||||
? params.getWriteBufferPeriodicFlushTimerTickMs()
|
||||
: tableConf.getWriteBufferPeriodicFlushTimerTickMs();
|
||||
: connConf.getWriteBufferPeriodicFlushTimerTickMs();
|
||||
this.setWriteBufferPeriodicFlush(
|
||||
newWriteBufferPeriodicFlushTimeoutMs,
|
||||
newWriteBufferPeriodicFlushTimerTickMs);
|
||||
|
||||
this.maxKeyValueSize = params.getMaxKeyValueSize() != BufferedMutatorParams.UNSET ?
|
||||
params.getMaxKeyValueSize() : tableConf.getMaxKeyValueSize();
|
||||
params.getMaxKeyValueSize() : connConf.getMaxKeyValueSize();
|
||||
|
||||
this.writeRpcTimeout = conn.getConfiguration().getInt(HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY,
|
||||
conn.getConfiguration().getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
|
||||
HConstants.DEFAULT_HBASE_RPC_TIMEOUT));
|
||||
this.operationTimeout = conn.getConfiguration().getInt(
|
||||
HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
|
||||
HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT);
|
||||
this.writeRpcTimeout = connConf.getWriteRpcTimeout();
|
||||
this.operationTimeout = connConf.getOperationTimeout();
|
||||
// puts need to track errors globally due to how the APIs currently work.
|
||||
ap = new AsyncProcess(connection, conf, pool, rpcCallerFactory, true, rpcFactory, writeRpcTimeout);
|
||||
}
|
||||
|
|
|
@ -12,6 +12,8 @@
|
|||
package org.apache.hadoop.hbase.client;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
|
@ -26,6 +28,7 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
|||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class ConnectionConfiguration {
|
||||
static final Log LOG = LogFactory.getLog(ConnectionConfiguration.class);
|
||||
|
||||
public static final String WRITE_BUFFER_SIZE_KEY = "hbase.client.write.buffer";
|
||||
public static final long WRITE_BUFFER_SIZE_DEFAULT = 2097152;
|
||||
|
@ -50,6 +53,10 @@ public class ConnectionConfiguration {
|
|||
private final int metaReplicaCallTimeoutMicroSecondScan;
|
||||
private final int retries;
|
||||
private final int maxKeyValueSize;
|
||||
private final int readRpcTimeout;
|
||||
private final int writeRpcTimeout;
|
||||
private final long pause;
|
||||
private final long pauseForCQTBE;
|
||||
|
||||
/**
|
||||
* Constructor
|
||||
|
@ -90,9 +97,28 @@ public class ConnectionConfiguration {
|
|||
HConstants.HBASE_CLIENT_META_REPLICA_SCAN_TIMEOUT_DEFAULT);
|
||||
|
||||
this.retries = conf.getInt(
|
||||
HConstants.HBASE_CLIENT_RETRIES_NUMBER, HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
|
||||
HConstants.HBASE_CLIENT_RETRIES_NUMBER, HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
|
||||
|
||||
this.maxKeyValueSize = conf.getInt(MAX_KEYVALUE_SIZE_KEY, MAX_KEYVALUE_SIZE_DEFAULT);
|
||||
|
||||
this.readRpcTimeout = conf.getInt(HConstants.HBASE_RPC_READ_TIMEOUT_KEY,
|
||||
conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
|
||||
HConstants.DEFAULT_HBASE_RPC_TIMEOUT));
|
||||
|
||||
this.writeRpcTimeout = conf.getInt(HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY,
|
||||
conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
|
||||
HConstants.DEFAULT_HBASE_RPC_TIMEOUT));
|
||||
|
||||
this.pause = conf.getLong(HConstants.HBASE_CLIENT_PAUSE, HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
|
||||
long configuredPauseForCQTBE = conf.getLong(HConstants.HBASE_CLIENT_PAUSE_FOR_CQTBE, pause);
|
||||
if (configuredPauseForCQTBE < pause) {
|
||||
LOG.warn("The " + HConstants.HBASE_CLIENT_PAUSE_FOR_CQTBE + " setting: "
|
||||
+ configuredPauseForCQTBE + " is smaller than " + HConstants.HBASE_CLIENT_PAUSE
|
||||
+ ", will use " + pause + " instead.");
|
||||
this.pauseForCQTBE = pause;
|
||||
} else {
|
||||
this.pauseForCQTBE = configuredPauseForCQTBE;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -115,6 +141,10 @@ public class ConnectionConfiguration {
|
|||
HConstants.HBASE_CLIENT_META_REPLICA_SCAN_TIMEOUT_DEFAULT;
|
||||
this.retries = HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER;
|
||||
this.maxKeyValueSize = MAX_KEYVALUE_SIZE_DEFAULT;
|
||||
this.readRpcTimeout = HConstants.DEFAULT_HBASE_RPC_TIMEOUT;
|
||||
this.writeRpcTimeout = HConstants.DEFAULT_HBASE_RPC_TIMEOUT;
|
||||
this.pause = HConstants.DEFAULT_HBASE_CLIENT_PAUSE;
|
||||
this.pauseForCQTBE = HConstants.DEFAULT_HBASE_CLIENT_PAUSE;
|
||||
}
|
||||
|
||||
public long getWriteBufferSize() {
|
||||
|
@ -164,4 +194,20 @@ public class ConnectionConfiguration {
|
|||
public long getScannerMaxResultSize() {
|
||||
return scannerMaxResultSize;
|
||||
}
|
||||
|
||||
public int getReadRpcTimeout() {
|
||||
return readRpcTimeout;
|
||||
}
|
||||
|
||||
public int getWriteRpcTimeout() {
|
||||
return writeRpcTimeout;
|
||||
}
|
||||
|
||||
public long getPause() {
|
||||
return pause;
|
||||
}
|
||||
|
||||
public long getPauseForCQTBE() {
|
||||
return pauseForCQTBE;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -669,17 +669,8 @@ class ConnectionManager {
|
|||
this.managed = managed;
|
||||
this.connectionConfig = new ConnectionConfiguration(conf);
|
||||
this.closed = false;
|
||||
this.pause = conf.getLong(HConstants.HBASE_CLIENT_PAUSE,
|
||||
HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
|
||||
long configuredPauseForCQTBE = conf.getLong(HConstants.HBASE_CLIENT_PAUSE_FOR_CQTBE, pause);
|
||||
if (configuredPauseForCQTBE < pause) {
|
||||
LOG.warn("The " + HConstants.HBASE_CLIENT_PAUSE_FOR_CQTBE + " setting: "
|
||||
+ configuredPauseForCQTBE + " is smaller than " + HConstants.HBASE_CLIENT_PAUSE
|
||||
+ ", will use " + pause + " instead.");
|
||||
this.pauseForCQTBE = pause;
|
||||
} else {
|
||||
this.pauseForCQTBE = configuredPauseForCQTBE;
|
||||
}
|
||||
this.pause = connectionConfig.getPause();
|
||||
this.pauseForCQTBE = connectionConfig.getPauseForCQTBE();
|
||||
this.useMetaReplicas = conf.getBoolean(HConstants.USE_META_REPLICAS,
|
||||
HConstants.DEFAULT_USE_META_REPLICAS);
|
||||
this.metaReplicaCallTimeoutScanInMicroSecond =
|
||||
|
|
|
@ -366,12 +366,8 @@ public class HTable implements HTableInterface, RegionLocator {
|
|||
}
|
||||
this.operationTimeout = tableName.isSystemTable() ?
|
||||
connConfiguration.getMetaOperationTimeout() : connConfiguration.getOperationTimeout();
|
||||
this.readRpcTimeout = configuration.getInt(HConstants.HBASE_RPC_READ_TIMEOUT_KEY,
|
||||
configuration.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
|
||||
HConstants.DEFAULT_HBASE_RPC_TIMEOUT));
|
||||
this.writeRpcTimeout = configuration.getInt(HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY,
|
||||
configuration.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
|
||||
HConstants.DEFAULT_HBASE_RPC_TIMEOUT));
|
||||
this.readRpcTimeout = connConfiguration.getReadRpcTimeout();
|
||||
this.writeRpcTimeout = connConfiguration.getWriteRpcTimeout();
|
||||
this.scannerCaching = connConfiguration.getScannerCaching();
|
||||
this.scannerMaxResultSize = connConfiguration.getScannerMaxResultSize();
|
||||
if (this.rpcCallerFactory == null) {
|
||||
|
|
|
@ -162,10 +162,11 @@ public class HConnectionTestingUtility {
|
|||
}
|
||||
NonceGenerator ng = Mockito.mock(NonceGenerator.class);
|
||||
Mockito.when(c.getNonceGenerator()).thenReturn(ng);
|
||||
Mockito.when(c.getAsyncProcess()).thenReturn(
|
||||
new AsyncProcess(c, conf, null, RpcRetryingCallerFactory.instantiate(conf), false,
|
||||
RpcControllerFactory.instantiate(conf), conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
|
||||
HConstants.DEFAULT_HBASE_RPC_TIMEOUT)));
|
||||
AsyncProcess asyncProcess =
|
||||
new AsyncProcess(c, conf, null, RpcRetryingCallerFactory.instantiate(conf), false,
|
||||
RpcControllerFactory.instantiate(conf), conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
|
||||
HConstants.DEFAULT_HBASE_RPC_TIMEOUT));
|
||||
Mockito.when(c.getAsyncProcess()).thenReturn(asyncProcess);
|
||||
Mockito.doNothing().when(c).incCount();
|
||||
Mockito.doNothing().when(c).decCount();
|
||||
Mockito.when(c.getNewRpcRetryingCallerFactory(conf)).thenReturn(
|
||||
|
|
Loading…
Reference in New Issue