diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Connection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Connection.java index b979c6a432f..a8cd296ab98 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Connection.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Connection.java @@ -83,7 +83,9 @@ public interface Connection extends Abortable, Closeable { * @param tableName the name of the table * @return a Table to use for interactions with this table */ - Table getTable(TableName tableName) throws IOException; + default Table getTable(TableName tableName) throws IOException { + return getTable(tableName, null); + } /** * Retrieve a Table implementation for accessing a table. @@ -102,7 +104,9 @@ public interface Connection extends Abortable, Closeable { * @param pool The thread pool to use for batch operations, null to use a default pool. * @return a Table to use for interactions with this table */ - Table getTable(TableName tableName, ExecutorService pool) throws IOException; + default Table getTable(TableName tableName, ExecutorService pool) throws IOException { + return getTableBuilder(tableName, pool).build(); + } /** *
@@ -173,4 +177,11 @@ public interface Connection extends Abortable, Closeable { * @return true if this connection is closed */ boolean isClosed(); + + /** + * Returns an {@link TableBuilder} for creating {@link Table}. + * @param tableName the name of the table + * @param pool the thread pool to use for requests like batch and scan + */ + TableBuilder getTableBuilder(TableName tableName, ExecutorService pool); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionConfiguration.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionConfiguration.java index 41f5baf75f8..bea91dad60c 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionConfiguration.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionConfiguration.java @@ -42,9 +42,10 @@ public class ConnectionConfiguration { private final int replicaCallTimeoutMicroSecondScan; private final int retries; private final int maxKeyValueSize; + private final int rpcTimeout; private final int readRpcTimeout; private final int writeRpcTimeout; - // toggle for async/sync prefetch + // toggle for async/sync prefetch private final boolean clientScannerAsyncPrefetch; /** @@ -82,6 +83,9 @@ public class ConnectionConfiguration { this.maxKeyValueSize = conf.getInt(MAX_KEYVALUE_SIZE_KEY, MAX_KEYVALUE_SIZE_DEFAULT); + this.rpcTimeout = + conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT); + this.readRpcTimeout = conf.getInt(HConstants.HBASE_RPC_READ_TIMEOUT_KEY, conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT)); @@ -108,6 +112,7 @@ public class ConnectionConfiguration { this.maxKeyValueSize = MAX_KEYVALUE_SIZE_DEFAULT; this.readRpcTimeout = HConstants.DEFAULT_HBASE_RPC_TIMEOUT; this.writeRpcTimeout = HConstants.DEFAULT_HBASE_RPC_TIMEOUT; + this.rpcTimeout = HConstants.DEFAULT_HBASE_RPC_TIMEOUT; } public int getReadRpcTimeout() { @@ -158,4 +163,8 @@ public class ConnectionConfiguration { return clientScannerAsyncPrefetch; } + public int getRpcTimeout() { + return rpcTimeout; + } + } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java index adbc7f94b8d..ca213650535 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java @@ -327,9 +327,15 @@ class ConnectionImplementation implements ClusterConnection, Closeable { } @Override - public Table getTable(TableName tableName, ExecutorService pool) throws IOException { - return new HTable(tableName, this, connectionConfig, - rpcCallerFactory, rpcControllerFactory, pool); + public TableBuilder getTableBuilder(TableName tableName, ExecutorService pool) { + return new TableBuilderBase(tableName, connectionConfig) { + + @Override + public Table build() { + return new HTable(ConnectionImplementation.this, this, rpcCallerFactory, + rpcControllerFactory, pool); + } + }; } @Override diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java index fd5eda39b4d..3bb0a778b85 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java @@ -82,10 +82,9 @@ import org.apache.hadoop.hbase.util.Threads; * Obtain an instance via {@link Connection}. See {@link ConnectionFactory} * class comment for an example of how. * - *
This class is NOT thread safe for reads nor writes. - * In the case of writes (Put, Delete), the underlying write buffer can - * be corrupted if multiple threads contend over a single HTable instance. - * In the case of reads, some fields used by a Scan are shared among all threads. + *
This class is thread safe since 2.0.0 if not invoking any of the setter methods. + * All setters are moved into {@link TableBuilder} and reserved here only for keeping + * backward compatibility, and TODO will be removed soon. * *
HTable is no longer a client API. Use {@link Table} instead. It is marked * InterfaceAudience.Private indicating that this is an HBase-internal class as defined in @@ -115,10 +114,12 @@ public class HTable implements Table { private final long scannerMaxResultSize; private final ExecutorService pool; // For Multi & Scan private int operationTimeout; // global timeout for each blocking method with retrying rpc + private final int rpcTimeout; // FIXME we should use this for rpc like batch and checkAndXXX private int readRpcTimeout; // timeout for each read rpc request private int writeRpcTimeout; // timeout for each write rpc request private final boolean cleanupPoolOnClose; // shutdown the pool in close() private final HRegionLocator locator; + private final long writeBufferSize; /** The Async process for batch */ @VisibleForTesting @@ -150,31 +151,24 @@ public class HTable implements Table { * Creates an object to access a HBase table. * Used by HBase internally. DO NOT USE. See {@link ConnectionFactory} class comment for how to * get a {@link Table} instance (use {@link Table} instead of {@link HTable}). - * @param tableName Name of the table. * @param connection Connection to be used. + * @param builder The table builder + * @param rpcCallerFactory The RPC caller factory + * @param rpcControllerFactory The RPC controller factory * @param pool ExecutorService to be used. - * @throws IOException if a remote or network exception occurs */ @InterfaceAudience.Private - protected HTable(TableName tableName, final ClusterConnection connection, - final ConnectionConfiguration tableConfig, + protected HTable(final ClusterConnection connection, + final TableBuilderBase builder, final RpcRetryingCallerFactory rpcCallerFactory, final RpcControllerFactory rpcControllerFactory, - final ExecutorService pool) throws IOException { + final ExecutorService pool) { if (connection == null || connection.isClosed()) { throw new IllegalArgumentException("Connection is null or closed."); } - if (tableName == null) { - throw new IllegalArgumentException("Given table name is null"); - } - this.tableName = tableName; this.connection = connection; this.configuration = connection.getConfiguration(); - if (tableConfig == null) { - connConfiguration = new ConnectionConfiguration(configuration); - } else { - connConfiguration = tableConfig; - } + this.connConfiguration = connection.getConnectionConfiguration(); if (pool == null) { this.pool = getDefaultExecutor(this.configuration); this.cleanupPoolOnClose = true; @@ -194,10 +188,12 @@ public class HTable implements Table { this.rpcControllerFactory = rpcControllerFactory; } - this.operationTimeout = tableName.isSystemTable() ? - connConfiguration.getMetaOperationTimeout() : connConfiguration.getOperationTimeout(); - this.readRpcTimeout = connConfiguration.getReadRpcTimeout(); - this.writeRpcTimeout = connConfiguration.getWriteRpcTimeout(); + this.tableName = builder.tableName; + this.operationTimeout = builder.operationTimeout; + this.rpcTimeout = builder.rpcTimeout; + this.readRpcTimeout = builder.readRpcTimeout; + this.writeRpcTimeout = builder.writeRpcTimeout; + this.writeBufferSize = builder.writeBufferSize; this.scannerCaching = connConfiguration.getScannerCaching(); this.scannerMaxResultSize = connConfiguration.getScannerMaxResultSize(); @@ -215,15 +211,16 @@ public class HTable implements Table { connection = conn; this.tableName = mutator.getName(); this.configuration = connection.getConfiguration(); - connConfiguration = new ConnectionConfiguration(configuration); + connConfiguration = connection.getConnectionConfiguration(); cleanupPoolOnClose = false; this.mutator = mutator; - this.operationTimeout = tableName.isSystemTable() ? - connConfiguration.getMetaOperationTimeout() : connConfiguration.getOperationTimeout(); + this.operationTimeout = connConfiguration.getOperationTimeout(); + this.rpcTimeout = connConfiguration.getRpcTimeout(); this.readRpcTimeout = connConfiguration.getReadRpcTimeout(); this.writeRpcTimeout = connConfiguration.getWriteRpcTimeout(); this.scannerCaching = connConfiguration.getScannerCaching(); this.scannerMaxResultSize = connConfiguration.getScannerMaxResultSize(); + this.writeBufferSize = connConfiguration.getWriteBufferSize(); this.rpcControllerFactory = null; this.rpcCallerFactory = null; this.pool = mutator.getPool(); @@ -1058,6 +1055,7 @@ public class HTable implements Table { * @throws IOException if a remote or network exception occurs. */ @Override + @Deprecated public void setWriteBufferSize(long writeBufferSize) throws IOException { getBufferedMutator(); mutator.setWriteBufferSize(writeBufferSize); @@ -1162,6 +1160,7 @@ public class HTable implements Table { } @Override + @Deprecated public void setOperationTimeout(int operationTimeout) { this.operationTimeout = operationTimeout; if (mutator != null) { @@ -1177,7 +1176,7 @@ public class HTable implements Table { @Override @Deprecated public int getRpcTimeout() { - return readRpcTimeout; + return rpcTimeout; } @Override @@ -1193,6 +1192,7 @@ public class HTable implements Table { } @Override + @Deprecated public void setWriteRpcTimeout(int writeRpcTimeout) { this.writeRpcTimeout = writeRpcTimeout; if (mutator != null) { @@ -1204,6 +1204,7 @@ public class HTable implements Table { public int getReadRpcTimeout() { return readRpcTimeout; } @Override + @Deprecated public void setReadRpcTimeout(int readRpcTimeout) { this.readRpcTimeout = readRpcTimeout; } @@ -1335,7 +1336,7 @@ public class HTable implements Table { this.mutator = (BufferedMutatorImpl) connection.getBufferedMutator( new BufferedMutatorParams(tableName) .pool(pool) - .writeBufferSize(connConfiguration.getWriteBufferSize()) + .writeBufferSize(writeBufferSize) .maxKeyValueSize(connConfiguration.getMaxKeyValueSize()) .opertationTimeout(operationTimeout) .rpcTimeout(writeRpcTimeout) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Table.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Table.java index 0f30cb45eb6..90fee8d1332 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Table.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Table.java @@ -593,7 +593,9 @@ public interface Table extends Closeable { * total time being blocking reach the operation timeout before retries exhausted, it will break * early and throw SocketTimeoutException. * @param operationTimeout the total timeout of each operation in millisecond. + * @deprecated since 2.0.0, use {@link TableBuilder#setOperationTimeout} instead */ + @Deprecated void setOperationTimeout(int operationTimeout); /** @@ -637,7 +639,9 @@ public interface Table extends Closeable { * until retries exhausted or operation timeout reached. * * @param readRpcTimeout + * @deprecated since 2.0.0, use {@link TableBuilder#setReadRpcTimeout} instead */ + @Deprecated void setReadRpcTimeout(int readRpcTimeout); /** @@ -652,6 +656,8 @@ public interface Table extends Closeable { * until retries exhausted or operation timeout reached. * * @param writeRpcTimeout + * @deprecated since 2.0.0, use {@link TableBuilder#setWriteRpcTimeout} instead */ + @Deprecated void setWriteRpcTimeout(int writeRpcTimeout); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableBuilder.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableBuilder.java new file mode 100644 index 00000000000..27e15962bdb --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableBuilder.java @@ -0,0 +1,71 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; + +/** + * For creating {@link Table} instance. + *
+ * The implementation should have default configurations set before returning the builder to user. + * So users are free to only set the configurations they care about to create a new + * Table instance. + */ +@InterfaceAudience.Public +@InterfaceStability.Evolving +public interface TableBuilder { + + /** + * Set timeout for a whole operation such as get, put or delete. Notice that scan will not be + * effected by this value, see scanTimeoutNs. + *
+ * Operation timeout and max attempt times(or max retry times) are both limitations for retrying, + * we will stop retrying when we reach any of the limitations. + */ + TableBuilder setOperationTimeout(int timeout); + + /** + * Set timeout for each rpc request. + *
+ * Notice that this will NOT change the rpc timeout for read(get, scan) request + * and write request(put, delete). + */ + TableBuilder setRpcTimeout(int timeout); + + /** + * Set timeout for each read(get, scan) rpc request. + */ + TableBuilder setReadRpcTimeout(int timeout); + + /** + * Set timeout for each write(put, delete) rpc request. + */ + TableBuilder setWriteRpcTimeout(int timeout); + + /** + * Set the write buffer size which by default is specified by the + * {@code hbase.client.write.buffer} setting. + */ + TableBuilder setWriteBufferSize(long writeBufferSize); + + /** + * Create the {@link Table} instance. + */ + Table build(); +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableBuilderBase.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableBuilderBase.java new file mode 100644 index 00000000000..adf1abb2a3e --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableBuilderBase.java @@ -0,0 +1,83 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; + +/** + * Base class for all table builders. + */ +@InterfaceAudience.Private +abstract class TableBuilderBase implements TableBuilder { + + protected TableName tableName; + + protected int operationTimeout; + + protected int rpcTimeout; + + protected int readRpcTimeout; + + protected int writeRpcTimeout; + + protected long writeBufferSize; + + TableBuilderBase(TableName tableName, ConnectionConfiguration connConf) { + if (tableName == null) { + throw new IllegalArgumentException("Given table name is null"); + } + this.tableName = tableName; + this.operationTimeout = tableName.isSystemTable() ? connConf.getMetaOperationTimeout() + : connConf.getOperationTimeout(); + this.rpcTimeout = connConf.getRpcTimeout(); + this.readRpcTimeout = connConf.getReadRpcTimeout(); + this.writeRpcTimeout = connConf.getWriteRpcTimeout(); + this.writeBufferSize = connConf.getWriteBufferSize(); + } + + @Override + public TableBuilderBase setOperationTimeout(int timeout) { + this.operationTimeout = timeout; + return this; + } + + @Override + public TableBuilderBase setRpcTimeout(int timeout) { + this.rpcTimeout = timeout; + return this; + } + + @Override + public TableBuilderBase setReadRpcTimeout(int timeout) { + this.readRpcTimeout = timeout; + return this; + } + + @Override + public TableBuilderBase setWriteRpcTimeout(int timeout) { + this.writeRpcTimeout = timeout; + return this; + } + + @Override + public TableBuilder setWriteBufferSize(long writeBufferSize) { + this.writeBufferSize = writeBufferSize; + return this; + } +} diff --git a/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/HBaseConnectionCacheSuite.scala b/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/HBaseConnectionCacheSuite.scala index 6ebf0449a47..b3fdd4edfbf 100644 --- a/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/HBaseConnectionCacheSuite.scala +++ b/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/HBaseConnectionCacheSuite.scala @@ -21,7 +21,7 @@ import java.util.concurrent.ExecutorService import scala.util.Random import org.apache.hadoop.hbase.client.{BufferedMutator, Table, RegionLocator, - Connection, BufferedMutatorParams, Admin} + Connection, BufferedMutatorParams, Admin, TableBuilder} import org.apache.hadoop.conf.Configuration import org.apache.hadoop.hbase.TableName import org.apache.spark.Logging @@ -50,6 +50,7 @@ class ConnectionMocker extends Connection { def getBufferedMutator (params: BufferedMutatorParams): BufferedMutator = null def getBufferedMutator (tableName: TableName): BufferedMutator = null def getAdmin: Admin = null + def getTableBuilder(tableName: TableName, pool: ExecutorService): TableBuilder = null def close(): Unit = { if (isClosed)