HBASE-17491 Remove all setters from HTable interface and introduce a TableBuilder to build Table instance

This commit is contained in:
Yu Li 2017-01-23 13:51:03 +08:00
parent 7754a9620e
commit 07e0a30efa
8 changed files with 222 additions and 34 deletions

View File

@ -83,7 +83,9 @@ public interface Connection extends Abortable, Closeable {
* @param tableName the name of the table * @param tableName the name of the table
* @return a Table to use for interactions with this table * @return a Table to use for interactions with this table
*/ */
Table getTable(TableName tableName) throws IOException; default Table getTable(TableName tableName) throws IOException {
return getTable(tableName, null);
}
/** /**
* Retrieve a Table implementation for accessing a table. * Retrieve a Table implementation for accessing a table.
@ -102,7 +104,9 @@ public interface Connection extends Abortable, Closeable {
* @param pool The thread pool to use for batch operations, null to use a default pool. * @param pool The thread pool to use for batch operations, null to use a default pool.
* @return a Table to use for interactions with this table * @return a Table to use for interactions with this table
*/ */
Table getTable(TableName tableName, ExecutorService pool) throws IOException; default Table getTable(TableName tableName, ExecutorService pool) throws IOException {
return getTableBuilder(tableName, pool).build();
}
/** /**
* <p> * <p>
@ -173,4 +177,11 @@ public interface Connection extends Abortable, Closeable {
* @return true if this connection is closed * @return true if this connection is closed
*/ */
boolean isClosed(); boolean isClosed();
/**
* Returns an {@link TableBuilder} for creating {@link Table}.
* @param tableName the name of the table
* @param pool the thread pool to use for requests like batch and scan
*/
TableBuilder getTableBuilder(TableName tableName, ExecutorService pool);
} }

View File

@ -42,6 +42,7 @@ public class ConnectionConfiguration {
private final int replicaCallTimeoutMicroSecondScan; private final int replicaCallTimeoutMicroSecondScan;
private final int retries; private final int retries;
private final int maxKeyValueSize; private final int maxKeyValueSize;
private final int rpcTimeout;
private final int readRpcTimeout; private final int readRpcTimeout;
private final int writeRpcTimeout; private final int writeRpcTimeout;
// toggle for async/sync prefetch // toggle for async/sync prefetch
@ -82,6 +83,9 @@ public class ConnectionConfiguration {
this.maxKeyValueSize = conf.getInt(MAX_KEYVALUE_SIZE_KEY, MAX_KEYVALUE_SIZE_DEFAULT); this.maxKeyValueSize = conf.getInt(MAX_KEYVALUE_SIZE_KEY, MAX_KEYVALUE_SIZE_DEFAULT);
this.rpcTimeout =
conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
this.readRpcTimeout = conf.getInt(HConstants.HBASE_RPC_READ_TIMEOUT_KEY, this.readRpcTimeout = conf.getInt(HConstants.HBASE_RPC_READ_TIMEOUT_KEY,
conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT)); conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT));
@ -108,6 +112,7 @@ public class ConnectionConfiguration {
this.maxKeyValueSize = MAX_KEYVALUE_SIZE_DEFAULT; this.maxKeyValueSize = MAX_KEYVALUE_SIZE_DEFAULT;
this.readRpcTimeout = HConstants.DEFAULT_HBASE_RPC_TIMEOUT; this.readRpcTimeout = HConstants.DEFAULT_HBASE_RPC_TIMEOUT;
this.writeRpcTimeout = HConstants.DEFAULT_HBASE_RPC_TIMEOUT; this.writeRpcTimeout = HConstants.DEFAULT_HBASE_RPC_TIMEOUT;
this.rpcTimeout = HConstants.DEFAULT_HBASE_RPC_TIMEOUT;
} }
public int getReadRpcTimeout() { public int getReadRpcTimeout() {
@ -158,4 +163,8 @@ public class ConnectionConfiguration {
return clientScannerAsyncPrefetch; return clientScannerAsyncPrefetch;
} }
public int getRpcTimeout() {
return rpcTimeout;
}
} }

View File

@ -327,9 +327,15 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
} }
@Override @Override
public Table getTable(TableName tableName, ExecutorService pool) throws IOException { public TableBuilder getTableBuilder(TableName tableName, ExecutorService pool) {
return new HTable(tableName, this, connectionConfig, return new TableBuilderBase(tableName, connectionConfig) {
rpcCallerFactory, rpcControllerFactory, pool);
@Override
public Table build() {
return new HTable(ConnectionImplementation.this, this, rpcCallerFactory,
rpcControllerFactory, pool);
}
};
} }
@Override @Override

View File

@ -82,10 +82,9 @@ import org.apache.hadoop.hbase.util.Threads;
* Obtain an instance via {@link Connection}. See {@link ConnectionFactory} * Obtain an instance via {@link Connection}. See {@link ConnectionFactory}
* class comment for an example of how. * class comment for an example of how.
* *
* <p>This class is NOT thread safe for reads nor writes. * <p>This class is thread safe since 2.0.0 if not invoking any of the setter methods.
* In the case of writes (Put, Delete), the underlying write buffer can * All setters are moved into {@link TableBuilder} and reserved here only for keeping
* be corrupted if multiple threads contend over a single HTable instance. * backward compatibility, and TODO will be removed soon.
* In the case of reads, some fields used by a Scan are shared among all threads.
* *
* <p>HTable is no longer a client API. Use {@link Table} instead. It is marked * <p>HTable is no longer a client API. Use {@link Table} instead. It is marked
* InterfaceAudience.Private indicating that this is an HBase-internal class as defined in * InterfaceAudience.Private indicating that this is an HBase-internal class as defined in
@ -115,10 +114,12 @@ public class HTable implements Table {
private final long scannerMaxResultSize; private final long scannerMaxResultSize;
private final ExecutorService pool; // For Multi & Scan private final ExecutorService pool; // For Multi & Scan
private int operationTimeout; // global timeout for each blocking method with retrying rpc private int operationTimeout; // global timeout for each blocking method with retrying rpc
private final int rpcTimeout; // FIXME we should use this for rpc like batch and checkAndXXX
private int readRpcTimeout; // timeout for each read rpc request private int readRpcTimeout; // timeout for each read rpc request
private int writeRpcTimeout; // timeout for each write rpc request private int writeRpcTimeout; // timeout for each write rpc request
private final boolean cleanupPoolOnClose; // shutdown the pool in close() private final boolean cleanupPoolOnClose; // shutdown the pool in close()
private final HRegionLocator locator; private final HRegionLocator locator;
private final long writeBufferSize;
/** The Async process for batch */ /** The Async process for batch */
@VisibleForTesting @VisibleForTesting
@ -150,31 +151,24 @@ public class HTable implements Table {
* Creates an object to access a HBase table. * Creates an object to access a HBase table.
* Used by HBase internally. DO NOT USE. See {@link ConnectionFactory} class comment for how to * Used by HBase internally. DO NOT USE. See {@link ConnectionFactory} class comment for how to
* get a {@link Table} instance (use {@link Table} instead of {@link HTable}). * get a {@link Table} instance (use {@link Table} instead of {@link HTable}).
* @param tableName Name of the table.
* @param connection Connection to be used. * @param connection Connection to be used.
* @param builder The table builder
* @param rpcCallerFactory The RPC caller factory
* @param rpcControllerFactory The RPC controller factory
* @param pool ExecutorService to be used. * @param pool ExecutorService to be used.
* @throws IOException if a remote or network exception occurs
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
protected HTable(TableName tableName, final ClusterConnection connection, protected HTable(final ClusterConnection connection,
final ConnectionConfiguration tableConfig, final TableBuilderBase builder,
final RpcRetryingCallerFactory rpcCallerFactory, final RpcRetryingCallerFactory rpcCallerFactory,
final RpcControllerFactory rpcControllerFactory, final RpcControllerFactory rpcControllerFactory,
final ExecutorService pool) throws IOException { final ExecutorService pool) {
if (connection == null || connection.isClosed()) { if (connection == null || connection.isClosed()) {
throw new IllegalArgumentException("Connection is null or closed."); throw new IllegalArgumentException("Connection is null or closed.");
} }
if (tableName == null) {
throw new IllegalArgumentException("Given table name is null");
}
this.tableName = tableName;
this.connection = connection; this.connection = connection;
this.configuration = connection.getConfiguration(); this.configuration = connection.getConfiguration();
if (tableConfig == null) { this.connConfiguration = connection.getConnectionConfiguration();
connConfiguration = new ConnectionConfiguration(configuration);
} else {
connConfiguration = tableConfig;
}
if (pool == null) { if (pool == null) {
this.pool = getDefaultExecutor(this.configuration); this.pool = getDefaultExecutor(this.configuration);
this.cleanupPoolOnClose = true; this.cleanupPoolOnClose = true;
@ -194,10 +188,12 @@ public class HTable implements Table {
this.rpcControllerFactory = rpcControllerFactory; this.rpcControllerFactory = rpcControllerFactory;
} }
this.operationTimeout = tableName.isSystemTable() ? this.tableName = builder.tableName;
connConfiguration.getMetaOperationTimeout() : connConfiguration.getOperationTimeout(); this.operationTimeout = builder.operationTimeout;
this.readRpcTimeout = connConfiguration.getReadRpcTimeout(); this.rpcTimeout = builder.rpcTimeout;
this.writeRpcTimeout = connConfiguration.getWriteRpcTimeout(); this.readRpcTimeout = builder.readRpcTimeout;
this.writeRpcTimeout = builder.writeRpcTimeout;
this.writeBufferSize = builder.writeBufferSize;
this.scannerCaching = connConfiguration.getScannerCaching(); this.scannerCaching = connConfiguration.getScannerCaching();
this.scannerMaxResultSize = connConfiguration.getScannerMaxResultSize(); this.scannerMaxResultSize = connConfiguration.getScannerMaxResultSize();
@ -215,15 +211,16 @@ public class HTable implements Table {
connection = conn; connection = conn;
this.tableName = mutator.getName(); this.tableName = mutator.getName();
this.configuration = connection.getConfiguration(); this.configuration = connection.getConfiguration();
connConfiguration = new ConnectionConfiguration(configuration); connConfiguration = connection.getConnectionConfiguration();
cleanupPoolOnClose = false; cleanupPoolOnClose = false;
this.mutator = mutator; this.mutator = mutator;
this.operationTimeout = tableName.isSystemTable() ? this.operationTimeout = connConfiguration.getOperationTimeout();
connConfiguration.getMetaOperationTimeout() : connConfiguration.getOperationTimeout(); this.rpcTimeout = connConfiguration.getRpcTimeout();
this.readRpcTimeout = connConfiguration.getReadRpcTimeout(); this.readRpcTimeout = connConfiguration.getReadRpcTimeout();
this.writeRpcTimeout = connConfiguration.getWriteRpcTimeout(); this.writeRpcTimeout = connConfiguration.getWriteRpcTimeout();
this.scannerCaching = connConfiguration.getScannerCaching(); this.scannerCaching = connConfiguration.getScannerCaching();
this.scannerMaxResultSize = connConfiguration.getScannerMaxResultSize(); this.scannerMaxResultSize = connConfiguration.getScannerMaxResultSize();
this.writeBufferSize = connConfiguration.getWriteBufferSize();
this.rpcControllerFactory = null; this.rpcControllerFactory = null;
this.rpcCallerFactory = null; this.rpcCallerFactory = null;
this.pool = mutator.getPool(); this.pool = mutator.getPool();
@ -1058,6 +1055,7 @@ public class HTable implements Table {
* @throws IOException if a remote or network exception occurs. * @throws IOException if a remote or network exception occurs.
*/ */
@Override @Override
@Deprecated
public void setWriteBufferSize(long writeBufferSize) throws IOException { public void setWriteBufferSize(long writeBufferSize) throws IOException {
getBufferedMutator(); getBufferedMutator();
mutator.setWriteBufferSize(writeBufferSize); mutator.setWriteBufferSize(writeBufferSize);
@ -1162,6 +1160,7 @@ public class HTable implements Table {
} }
@Override @Override
@Deprecated
public void setOperationTimeout(int operationTimeout) { public void setOperationTimeout(int operationTimeout) {
this.operationTimeout = operationTimeout; this.operationTimeout = operationTimeout;
if (mutator != null) { if (mutator != null) {
@ -1177,7 +1176,7 @@ public class HTable implements Table {
@Override @Override
@Deprecated @Deprecated
public int getRpcTimeout() { public int getRpcTimeout() {
return readRpcTimeout; return rpcTimeout;
} }
@Override @Override
@ -1193,6 +1192,7 @@ public class HTable implements Table {
} }
@Override @Override
@Deprecated
public void setWriteRpcTimeout(int writeRpcTimeout) { public void setWriteRpcTimeout(int writeRpcTimeout) {
this.writeRpcTimeout = writeRpcTimeout; this.writeRpcTimeout = writeRpcTimeout;
if (mutator != null) { if (mutator != null) {
@ -1204,6 +1204,7 @@ public class HTable implements Table {
public int getReadRpcTimeout() { return readRpcTimeout; } public int getReadRpcTimeout() { return readRpcTimeout; }
@Override @Override
@Deprecated
public void setReadRpcTimeout(int readRpcTimeout) { public void setReadRpcTimeout(int readRpcTimeout) {
this.readRpcTimeout = readRpcTimeout; this.readRpcTimeout = readRpcTimeout;
} }
@ -1335,7 +1336,7 @@ public class HTable implements Table {
this.mutator = (BufferedMutatorImpl) connection.getBufferedMutator( this.mutator = (BufferedMutatorImpl) connection.getBufferedMutator(
new BufferedMutatorParams(tableName) new BufferedMutatorParams(tableName)
.pool(pool) .pool(pool)
.writeBufferSize(connConfiguration.getWriteBufferSize()) .writeBufferSize(writeBufferSize)
.maxKeyValueSize(connConfiguration.getMaxKeyValueSize()) .maxKeyValueSize(connConfiguration.getMaxKeyValueSize())
.opertationTimeout(operationTimeout) .opertationTimeout(operationTimeout)
.rpcTimeout(writeRpcTimeout) .rpcTimeout(writeRpcTimeout)

View File

@ -593,7 +593,9 @@ public interface Table extends Closeable {
* total time being blocking reach the operation timeout before retries exhausted, it will break * total time being blocking reach the operation timeout before retries exhausted, it will break
* early and throw SocketTimeoutException. * early and throw SocketTimeoutException.
* @param operationTimeout the total timeout of each operation in millisecond. * @param operationTimeout the total timeout of each operation in millisecond.
* @deprecated since 2.0.0, use {@link TableBuilder#setOperationTimeout} instead
*/ */
@Deprecated
void setOperationTimeout(int operationTimeout); void setOperationTimeout(int operationTimeout);
/** /**
@ -637,7 +639,9 @@ public interface Table extends Closeable {
* until retries exhausted or operation timeout reached. * until retries exhausted or operation timeout reached.
* *
* @param readRpcTimeout * @param readRpcTimeout
* @deprecated since 2.0.0, use {@link TableBuilder#setReadRpcTimeout} instead
*/ */
@Deprecated
void setReadRpcTimeout(int readRpcTimeout); void setReadRpcTimeout(int readRpcTimeout);
/** /**
@ -652,6 +656,8 @@ public interface Table extends Closeable {
* until retries exhausted or operation timeout reached. * until retries exhausted or operation timeout reached.
* *
* @param writeRpcTimeout * @param writeRpcTimeout
* @deprecated since 2.0.0, use {@link TableBuilder#setWriteRpcTimeout} instead
*/ */
@Deprecated
void setWriteRpcTimeout(int writeRpcTimeout); void setWriteRpcTimeout(int writeRpcTimeout);
} }

View File

@ -0,0 +1,71 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
/**
* For creating {@link Table} instance.
* <p>
* The implementation should have default configurations set before returning the builder to user.
* So users are free to only set the configurations they care about to create a new
* Table instance.
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
public interface TableBuilder {
/**
* Set timeout for a whole operation such as get, put or delete. Notice that scan will not be
* effected by this value, see scanTimeoutNs.
* <p>
* Operation timeout and max attempt times(or max retry times) are both limitations for retrying,
* we will stop retrying when we reach any of the limitations.
*/
TableBuilder setOperationTimeout(int timeout);
/**
* Set timeout for each rpc request.
* <p>
* Notice that this will <strong>NOT</strong> change the rpc timeout for read(get, scan) request
* and write request(put, delete).
*/
TableBuilder setRpcTimeout(int timeout);
/**
* Set timeout for each read(get, scan) rpc request.
*/
TableBuilder setReadRpcTimeout(int timeout);
/**
* Set timeout for each write(put, delete) rpc request.
*/
TableBuilder setWriteRpcTimeout(int timeout);
/**
* Set the write buffer size which by default is specified by the
* {@code hbase.client.write.buffer} setting.
*/
TableBuilder setWriteBufferSize(long writeBufferSize);
/**
* Create the {@link Table} instance.
*/
Table build();
}

View File

@ -0,0 +1,83 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
/**
* Base class for all table builders.
*/
@InterfaceAudience.Private
abstract class TableBuilderBase implements TableBuilder {
protected TableName tableName;
protected int operationTimeout;
protected int rpcTimeout;
protected int readRpcTimeout;
protected int writeRpcTimeout;
protected long writeBufferSize;
TableBuilderBase(TableName tableName, ConnectionConfiguration connConf) {
if (tableName == null) {
throw new IllegalArgumentException("Given table name is null");
}
this.tableName = tableName;
this.operationTimeout = tableName.isSystemTable() ? connConf.getMetaOperationTimeout()
: connConf.getOperationTimeout();
this.rpcTimeout = connConf.getRpcTimeout();
this.readRpcTimeout = connConf.getReadRpcTimeout();
this.writeRpcTimeout = connConf.getWriteRpcTimeout();
this.writeBufferSize = connConf.getWriteBufferSize();
}
@Override
public TableBuilderBase setOperationTimeout(int timeout) {
this.operationTimeout = timeout;
return this;
}
@Override
public TableBuilderBase setRpcTimeout(int timeout) {
this.rpcTimeout = timeout;
return this;
}
@Override
public TableBuilderBase setReadRpcTimeout(int timeout) {
this.readRpcTimeout = timeout;
return this;
}
@Override
public TableBuilderBase setWriteRpcTimeout(int timeout) {
this.writeRpcTimeout = timeout;
return this;
}
@Override
public TableBuilder setWriteBufferSize(long writeBufferSize) {
this.writeBufferSize = writeBufferSize;
return this;
}
}

View File

@ -21,7 +21,7 @@ import java.util.concurrent.ExecutorService
import scala.util.Random import scala.util.Random
import org.apache.hadoop.hbase.client.{BufferedMutator, Table, RegionLocator, import org.apache.hadoop.hbase.client.{BufferedMutator, Table, RegionLocator,
Connection, BufferedMutatorParams, Admin} Connection, BufferedMutatorParams, Admin, TableBuilder}
import org.apache.hadoop.conf.Configuration import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase.TableName import org.apache.hadoop.hbase.TableName
import org.apache.spark.Logging import org.apache.spark.Logging
@ -50,6 +50,7 @@ class ConnectionMocker extends Connection {
def getBufferedMutator (params: BufferedMutatorParams): BufferedMutator = null def getBufferedMutator (params: BufferedMutatorParams): BufferedMutator = null
def getBufferedMutator (tableName: TableName): BufferedMutator = null def getBufferedMutator (tableName: TableName): BufferedMutator = null
def getAdmin: Admin = null def getAdmin: Admin = null
def getTableBuilder(tableName: TableName, pool: ExecutorService): TableBuilder = null
def close(): Unit = { def close(): Unit = {
if (isClosed) if (isClosed)