diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java index 84e1da635fa..78fad9ea235 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java @@ -35,6 +35,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import org.apache.commons.io.IOUtils; import org.apache.hadoop.conf.Configuration; @@ -113,7 +114,7 @@ class AsyncConnectionImpl implements AsyncConnection { private ChoreService authService; - private volatile boolean closed = false; + private final AtomicBoolean closed = new AtomicBoolean(false); private final Optional metrics; @@ -188,14 +189,12 @@ class AsyncConnectionImpl implements AsyncConnection { @Override public boolean isClosed() { - return closed; + return closed.get(); } @Override public void close() { - // As the code below is safe to be executed in parallel, here we do not use CAS or lock, just a - // simple volatile flag. - if (closed) { + if (!closed.compareAndSet(false, true)) { return; } IOUtils.closeQuietly(clusterStatusListener); @@ -209,7 +208,6 @@ class AsyncConnectionImpl implements AsyncConnection { if (c != null) { c.closePool(); } - closed = true; } @Override diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionOverAsyncConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionOverAsyncConnection.java index 861aab086d4..b61cef5c910 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionOverAsyncConnection.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionOverAsyncConnection.java @@ -18,18 +18,25 @@ package org.apache.hadoop.hbase.client; import java.io.IOException; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.DoNotRetryIOException; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.log.HBaseMarkers; +import org.apache.hadoop.hbase.util.ConcurrentMapUtils.IOExceptionSupplier; import org.apache.hadoop.hbase.util.FutureUtils; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hbase.thirdparty.com.google.common.io.Closeables; +import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; /** * The connection implementation based on {@link AsyncConnection}. @@ -41,6 +48,10 @@ class ConnectionOverAsyncConnection implements Connection { private volatile boolean aborted = false; + // only used for executing coprocessor calls, as users may reference the methods in the + // BlockingInterface of the protobuf stub so we have to execute the call in a separated thread... + // Will be removed in 4.0.0 along with the deprecated coprocessor methods in Table and Admin + // interface. private volatile ExecutorService batchPool = null; private final AsyncConnectionImpl conn; @@ -121,7 +132,7 @@ class ConnectionOverAsyncConnection implements Connection { // will be called from AsyncConnection, to avoid infinite loop as in the above method we will call // AsyncConnection.close. - void closePool() { + synchronized void closePool() { ExecutorService batchPool = this.batchPool; if (batchPool != null) { ConnectionUtils.shutdownPool(batchPool); @@ -134,13 +145,36 @@ class ConnectionOverAsyncConnection implements Connection { return conn.isClosed(); } - private ExecutorService getBatchPool() { + // only used for executing coprocessor calls, as users may reference the methods in the + // BlockingInterface of the protobuf stub so we have to execute the call in a separated thread... + // Will be removed in 4.0.0 along with the deprecated coprocessor methods in Table and Admin + // interface. + private ThreadPoolExecutor createThreadPool() { + Configuration conf = conn.getConfiguration(); + int threads = conf.getInt("hbase.hconnection.threads.max", 256); + long keepAliveTime = conf.getLong("hbase.hconnection.threads.keepalivetime", 60); + BlockingQueue workQueue = + new LinkedBlockingQueue<>(threads * conf.getInt(HConstants.HBASE_CLIENT_MAX_TOTAL_TASKS, + HConstants.DEFAULT_HBASE_CLIENT_MAX_TOTAL_TASKS)); + ThreadPoolExecutor tpe = new ThreadPoolExecutor(threads, threads, keepAliveTime, + TimeUnit.SECONDS, workQueue, + new ThreadFactoryBuilder().setDaemon(true).setNameFormat(toString() + "-shared-%d").build()); + tpe.allowCoreThreadTimeOut(true); + return tpe; + } + + // only used for executing coprocessor calls, as users may reference the methods in the + // BlockingInterface of the protobuf stub so we have to execute the call in a separated thread... + // Will be removed in 4.0.0 along with the deprecated coprocessor methods in Table and Admin + // interface. + private ExecutorService getBatchPool() throws IOException { if (batchPool == null) { synchronized (this) { + if (isClosed()) { + throw new DoNotRetryIOException("Connection is closed"); + } if (batchPool == null) { - int threads = conn.getConfiguration().getInt("hbase.hconnection.threads.max", 256); - this.batchPool = ConnectionUtils.getThreadPool(conn.getConfiguration(), threads, threads, - () -> toString() + "-shared", null); + this.batchPool = createThreadPool(); } } } @@ -153,13 +187,14 @@ class ConnectionOverAsyncConnection implements Connection { @Override public Table build() { - ExecutorService p = pool != null ? pool : getBatchPool(); + IOExceptionSupplier poolSupplier = + pool != null ? () -> pool : ConnectionOverAsyncConnection.this::getBatchPool; return new TableOverAsyncTable(conn, conn.getTableBuilder(tableName).setRpcTimeout(rpcTimeout, TimeUnit.MILLISECONDS) .setReadRpcTimeout(readRpcTimeout, TimeUnit.MILLISECONDS) .setWriteRpcTimeout(writeRpcTimeout, TimeUnit.MILLISECONDS) .setOperationTimeout(operationTimeout, TimeUnit.MILLISECONDS).build(), - p); + poolSupplier); } }; } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java index d6334c288a1..999bcc54c57 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java @@ -29,12 +29,9 @@ import java.net.UnknownHostException; import java.util.Arrays; import java.util.List; import java.util.Optional; -import java.util.concurrent.BlockingQueue; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; -import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadLocalRandom; -import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; @@ -54,7 +51,6 @@ import org.apache.hadoop.hbase.ipc.HBaseRpcController; import org.apache.hadoop.hbase.ipc.ServerRpcController; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.ReflectionUtils; -import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.net.DNS; import org.apache.yetus.audience.InterfaceAudience; @@ -646,29 +642,6 @@ public final class ConnectionUtils { return future; } - static ThreadPoolExecutor getThreadPool(Configuration conf, int maxThreads, int coreThreads, - Supplier threadName, BlockingQueue passedWorkQueue) { - // shared HTable thread executor not yet initialized - if (maxThreads == 0) { - maxThreads = Runtime.getRuntime().availableProcessors() * 8; - } - if (coreThreads == 0) { - coreThreads = Runtime.getRuntime().availableProcessors() * 8; - } - long keepAliveTime = conf.getLong("hbase.hconnection.threads.keepalivetime", 60); - BlockingQueue workQueue = passedWorkQueue; - if (workQueue == null) { - workQueue = - new LinkedBlockingQueue<>(maxThreads * conf.getInt(HConstants.HBASE_CLIENT_MAX_TOTAL_TASKS, - HConstants.DEFAULT_HBASE_CLIENT_MAX_TOTAL_TASKS)); - coreThreads = maxThreads; - } - ThreadPoolExecutor tpe = new ThreadPoolExecutor(coreThreads, maxThreads, keepAliveTime, - TimeUnit.SECONDS, workQueue, Threads.newDaemonThreadFactory(threadName.get())); - tpe.allowCoreThreadTimeOut(true); - return tpe; - } - static void shutdownPool(ExecutorService pool) { pool.shutdown(); try { diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableOverAsyncTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableOverAsyncTable.java index 5686b09c8b1..0a2a66eecae 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableOverAsyncTable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableOverAsyncTable.java @@ -55,6 +55,7 @@ import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback; import org.apache.hadoop.hbase.io.TimeRange; import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.ConcurrentMapUtils.IOExceptionSupplier; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.FutureUtils; import org.apache.hadoop.hbase.util.Pair; @@ -76,12 +77,13 @@ class TableOverAsyncTable implements Table { private final AsyncTable table; - private final ExecutorService pool; + private final IOExceptionSupplier poolSupplier; - TableOverAsyncTable(AsyncConnectionImpl conn, AsyncTable table, ExecutorService pool) { + TableOverAsyncTable(AsyncConnectionImpl conn, AsyncTable table, + IOExceptionSupplier poolSupplier) { this.conn = conn; this.table = table; - this.pool = pool; + this.poolSupplier = poolSupplier; } @Override @@ -423,6 +425,7 @@ class TableOverAsyncTable implements Table { private void coprocssorService(String serviceName, byte[] startKey, byte[] endKey, Callback callback, StubCall call) throws Throwable { // get regions covered by the row range + ExecutorService pool = this.poolSupplier.get(); List keys = getStartKeysInRange(startKey, endKey); Map> futures = new TreeMap<>(Bytes.BYTES_COMPARATOR); try { @@ -443,7 +446,7 @@ class TableOverAsyncTable implements Table { } } catch (RejectedExecutionException e) { // maybe the connection has been closed, let's check - if (pool.isShutdown()) { + if (conn.isClosed()) { throw new DoNotRetryIOException("Connection is closed", e); } else { throw new HBaseIOException("Coprocessor operation is rejected", e); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestConnection.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestConnection.java index 8dd4709c1c9..df715c2376e 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestConnection.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestConnection.java @@ -21,6 +21,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; +import com.google.protobuf.ServiceException; import java.io.IOException; import java.util.List; import java.util.Set; @@ -30,6 +31,7 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; import java.util.stream.IntStream; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HConstants; @@ -37,7 +39,11 @@ import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.Waiter; +import org.apache.hadoop.hbase.client.coprocessor.Batch; +import org.apache.hadoop.hbase.coprocessor.MultiRowMutationEndpoint; import org.apache.hadoop.hbase.ipc.RpcClient; +import org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProtos.MultiRowMutationService; +import org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProtos.MutateRowsResponse; import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; @@ -336,7 +342,7 @@ public class TestConnection { TEST_UTIL.getAdmin().createTable(builder.build()); try (Connection conn = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration()); - RegionLocator locator = conn.getRegionLocator(tableName)) { + RegionLocator locator = conn.getRegionLocator(tableName)) { // Get locations of the regions of the table List locations = locator.getAllRegionLocations(); @@ -353,4 +359,27 @@ public class TestConnection { TEST_UTIL.deleteTable(tableName); } } + + @Test(expected = DoNotRetryIOException.class) + public void testClosedConnection() throws ServiceException, Throwable { + byte[] family = Bytes.toBytes("cf"); + TableName tableName = TableName.valueOf(name.getMethodName()); + TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName) + .setCoprocessor(MultiRowMutationEndpoint.class.getName()) + .setColumnFamily(ColumnFamilyDescriptorBuilder.of(family)); + TEST_UTIL.getAdmin().createTable(builder.build()); + + Connection conn = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration()); + // cache the location + try (Table table = conn.getTable(tableName)) { + table.get(new Get(Bytes.toBytes(0))); + } finally { + conn.close(); + } + Batch.Call callable = service -> { + throw new RuntimeException("Should not arrive here"); + }; + conn.getTable(tableName).coprocessorService(MultiRowMutationService.class, + HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW, callable); + } }