HBASE-12128 Cache configuration and RpcController selection for Table in Connection

This commit is contained in:
Enis Soztutar 2014-11-25 13:47:36 -08:00
parent c0cdaf8400
commit b9dfcd01b8
3 changed files with 166 additions and 35 deletions

View File

@ -563,6 +563,10 @@ class ConnectionManager {
private final Configuration conf;
// cache the configuration value for tables so that we can avoid calling
// the expensive Configuration to fetch the value multiple times.
private final TableConfiguration tableConfig;
// Client rpc instance.
private RpcClient rpcClient;
@ -642,11 +646,11 @@ class ConnectionManager {
*/
protected HConnectionImplementation(Configuration conf) {
this.conf = conf;
this.tableConfig = new TableConfiguration(conf);
this.closed = false;
this.pause = conf.getLong(HConstants.HBASE_CLIENT_PAUSE,
HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
this.numTries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
this.numTries = tableConfig.getRetriesNumber();
this.rpcTimeout = conf.getInt(
HConstants.HBASE_RPC_TIMEOUT_KEY,
HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
@ -695,7 +699,7 @@ class ConnectionManager {
if (managed) {
throw new IOException("The connection has to be unmanaged.");
}
return new HTable(tableName, this, pool);
return new HTable(tableName, this, tableConfig, rpcCallerFactory, rpcControllerFactory, pool);
}
@Override
@ -703,7 +707,8 @@ class ConnectionManager {
if (managed) {
throw new IOException("The connection has to be unmanaged.");
}
return new HTable(tableName, this, getBatchPool());
return new HTable(
tableName, this, tableConfig, rpcCallerFactory, rpcControllerFactory, getBatchPool());
}
@Override

View File

@ -110,23 +110,19 @@ public class HTable implements HTableInterface, RegionLocator {
protected ClusterConnection connection;
private final TableName tableName;
private volatile Configuration configuration;
private TableConfiguration tableConfiguration;
protected List<Row> writeAsyncBuffer = new LinkedList<Row>();
private long writeBufferSize;
private boolean clearBufferOnFail;
private boolean autoFlush;
protected long currentWriteBufferSize;
protected int scannerCaching;
private int maxKeyValueSize;
private ExecutorService pool; // For Multi & Scan
private boolean closed;
private int operationTimeout;
private int retries;
private final boolean cleanupPoolOnClose; // shutdown the pool in close()
private final boolean cleanupConnectionOnClose; // close the connection in close()
private Consistency defaultConsistency = Consistency.STRONG;
private int primaryCallTimeoutMicroSecond;
private int replicaCallTimeoutMicroSecondScan;
/** The Async process for puts with autoflush set to false or multiputs */
protected AsyncProcess ap;
@ -284,7 +280,7 @@ public class HTable implements HTableInterface, RegionLocator {
@Deprecated
public HTable(TableName tableName, final Connection connection,
final ExecutorService pool) throws IOException {
this(tableName, (ClusterConnection)connection, pool);
this(tableName, (ClusterConnection)connection, null, null, null, pool);
}
/**
@ -298,6 +294,9 @@ public class HTable implements HTableInterface, RegionLocator {
*/
@InterfaceAudience.Private
public HTable(TableName tableName, final ClusterConnection connection,
final TableConfiguration tableConfig,
final RpcRetryingCallerFactory rpcCallerFactory,
final RpcControllerFactory rpcControllerFactory,
final ExecutorService pool) throws IOException {
if (connection == null || connection.isClosed()) {
throw new IllegalArgumentException("Connection is null or closed.");
@ -306,6 +305,7 @@ public class HTable implements HTableInterface, RegionLocator {
this.cleanupConnectionOnClose = false;
this.connection = connection;
this.configuration = connection.getConfiguration();
this.tableConfiguration = tableConfig;
this.pool = pool;
if (pool == null) {
this.pool = getDefaultExecutor(this.configuration);
@ -314,6 +314,9 @@ public class HTable implements HTableInterface, RegionLocator {
this.cleanupPoolOnClose = false;
}
this.rpcCallerFactory = rpcCallerFactory;
this.rpcControllerFactory = rpcControllerFactory;
this.finishSetup();
}
@ -323,6 +326,7 @@ public class HTable implements HTableInterface, RegionLocator {
@VisibleForTesting
protected HTable() {
tableName = null;
tableConfiguration = new TableConfiguration();
cleanupPoolOnClose = false;
cleanupConnectionOnClose = false;
}
@ -338,33 +342,29 @@ public class HTable implements HTableInterface, RegionLocator {
* setup this HTable's parameter based on the passed configuration
*/
private void finishSetup() throws IOException {
if (tableConfiguration == null) {
tableConfiguration = new TableConfiguration(configuration);
}
this.operationTimeout = tableName.isSystemTable() ?
this.configuration.getInt(HConstants.HBASE_CLIENT_META_OPERATION_TIMEOUT,
HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT):
this.configuration.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT);
this.writeBufferSize = this.configuration.getLong(
"hbase.client.write.buffer", 2097152);
tableConfiguration.getMetaOperationTimeout() : tableConfiguration.getOperationTimeout();
this.writeBufferSize = tableConfiguration.getWriteBufferSize();
this.clearBufferOnFail = true;
this.autoFlush = true;
this.currentWriteBufferSize = 0;
this.scannerCaching = this.configuration.getInt(
HConstants.HBASE_CLIENT_SCANNER_CACHING,
HConstants.DEFAULT_HBASE_CLIENT_SCANNER_CACHING);
this.primaryCallTimeoutMicroSecond =
this.configuration.getInt("hbase.client.primaryCallTimeout.get", 10000); // 10 ms
this.replicaCallTimeoutMicroSecondScan =
this.configuration.getInt("hbase.client.replicaCallTimeout.scan", 1000000); // 1000 ms
this.retries = configuration.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
this.scannerCaching = tableConfiguration.getScannerCaching();
if (this.rpcCallerFactory == null) {
this.rpcCallerFactory = connection.getNewRpcRetryingCallerFactory(configuration);
}
if (this.rpcControllerFactory == null) {
this.rpcControllerFactory = RpcControllerFactory.instantiate(configuration);
}
this.rpcCallerFactory = connection.getNewRpcRetryingCallerFactory(configuration);
this.rpcControllerFactory = RpcControllerFactory.instantiate(configuration);
// puts need to track errors globally due to how the APIs currently work.
ap = new AsyncProcess(connection, configuration, pool, rpcCallerFactory, true, rpcControllerFactory);
multiAp = this.connection.getAsyncProcess();
this.maxKeyValueSize = getMaxKeyValueSize(this.configuration);
this.closed = false;
}
@ -773,22 +773,22 @@ public class HTable implements HTableInterface, RegionLocator {
if (scan.isSmall()) {
return new ClientSmallReversedScanner(getConfiguration(), scan, getName(),
this.connection, this.rpcCallerFactory, this.rpcControllerFactory,
pool, replicaCallTimeoutMicroSecondScan);
pool, tableConfiguration.getReplicaCallTimeoutMicroSecondScan());
} else {
return new ReversedClientScanner(getConfiguration(), scan, getName(),
this.connection, this.rpcCallerFactory, this.rpcControllerFactory,
pool, replicaCallTimeoutMicroSecondScan);
pool, tableConfiguration.getReplicaCallTimeoutMicroSecondScan());
}
}
if (scan.isSmall()) {
return new ClientSmallScanner(getConfiguration(), scan, getName(),
this.connection, this.rpcCallerFactory, this.rpcControllerFactory,
pool, replicaCallTimeoutMicroSecondScan);
pool, tableConfiguration.getReplicaCallTimeoutMicroSecondScan());
} else {
return new ClientScanner(getConfiguration(), scan, getName(), this.connection,
this.rpcCallerFactory, this.rpcControllerFactory,
pool, replicaCallTimeoutMicroSecondScan);
pool, tableConfiguration.getReplicaCallTimeoutMicroSecondScan());
}
}
@ -849,8 +849,10 @@ public class HTable implements HTableInterface, RegionLocator {
// Call that takes into account the replica
RpcRetryingCallerWithReadReplicas callable = new RpcRetryingCallerWithReadReplicas(
rpcControllerFactory, tableName, this.connection, get, pool, retries,
operationTimeout, primaryCallTimeoutMicroSecond);
rpcControllerFactory, tableName, this.connection, get, pool,
tableConfiguration.getRetriesNumber(),
operationTimeout,
tableConfiguration.getPrimaryCallTimeoutMicroSecond());
return callable.call();
}
@ -1503,7 +1505,7 @@ public class HTable implements HTableInterface, RegionLocator {
// validate for well-formedness
public void validatePut(final Put put) throws IllegalArgumentException {
validatePut(put, maxKeyValueSize);
validatePut(put, tableConfiguration.getMaxKeyValueSize());
}
// validate for well-formedness

View File

@ -0,0 +1,124 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable
* law or agreed to in writing, software distributed under the License is distributed on an "AS IS"
* BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License
* for the specific language governing permissions and limitations under the License.
*/
package org.apache.hadoop.hbase.client;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import com.google.common.annotations.VisibleForTesting;
/**
*
* Configuration is a heavy weight registry that does a lot of string operations and regex matching.
* Method calls into Configuration account for high CPU usage and have huge performance impact.
* This class caches the value in the TableConfiguration object to improve performance.
* see HBASE-12128
*
*/
@InterfaceAudience.Private
public class TableConfiguration {
private final long writeBufferSize;
private final int metaOperationTimeout;
private final int operationTimeout;
private final int scannerCaching;
private final int primaryCallTimeoutMicroSecond;
private final int replicaCallTimeoutMicroSecondScan;
private final int retries;
private final int maxKeyValueSize;
/**
* Constructor
* @param conf Configuration object
*/
TableConfiguration(Configuration conf) {
this.writeBufferSize = conf.getLong("hbase.client.write.buffer", 2097152);
this.metaOperationTimeout = conf.getInt(
HConstants.HBASE_CLIENT_META_OPERATION_TIMEOUT,
HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT);
this.operationTimeout = conf.getInt(
HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT);
this.scannerCaching = conf.getInt(
HConstants.HBASE_CLIENT_SCANNER_CACHING, HConstants.DEFAULT_HBASE_CLIENT_SCANNER_CACHING);
this.primaryCallTimeoutMicroSecond =
conf.getInt("hbase.client.primaryCallTimeout.get", 10000); // 10ms
this.replicaCallTimeoutMicroSecondScan =
conf.getInt("hbase.client.replicaCallTimeout.scan", 1000000); // 1000 ms
this.retries = conf.getInt(
HConstants.HBASE_CLIENT_RETRIES_NUMBER, HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
this.maxKeyValueSize = conf.getInt("hbase.client.keyvalue.maxsize", -1);
}
/**
* Constructor
* This is for internal testing purpose (using the default value).
* In real usage, we should read the configuration from the Configuration object.
*/
@VisibleForTesting
protected TableConfiguration() {
this.writeBufferSize = 2097152;
this.metaOperationTimeout = HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT;
this.operationTimeout = HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT;
this.scannerCaching = HConstants.DEFAULT_HBASE_CLIENT_SCANNER_CACHING;
this.primaryCallTimeoutMicroSecond = 10000;
this.replicaCallTimeoutMicroSecondScan = 1000000;
this.retries = HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER;
this.maxKeyValueSize = -1;
}
public long getWriteBufferSize() {
return writeBufferSize;
}
public int getMetaOperationTimeout() {
return metaOperationTimeout;
}
public int getOperationTimeout() {
return operationTimeout;
}
public int getScannerCaching() {
return scannerCaching;
}
public int getPrimaryCallTimeoutMicroSecond() {
return primaryCallTimeoutMicroSecond;
}
public int getReplicaCallTimeoutMicroSecondScan() {
return replicaCallTimeoutMicroSecondScan;
}
public int getRetriesNumber() {
return retries;
}
public int getMaxKeyValueSize() {
return maxKeyValueSize;
}
}