HBASE-22550 Throw exception when creating thread pool if the connection has already been closed
This commit is contained in:
parent
492a105e5d
commit
6278c98f5d
|
@ -35,6 +35,7 @@ import java.util.concurrent.ConcurrentHashMap;
|
|||
import java.util.concurrent.ConcurrentMap;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
|
@ -113,7 +114,7 @@ class AsyncConnectionImpl implements AsyncConnection {
|
|||
|
||||
private ChoreService authService;
|
||||
|
||||
private volatile boolean closed = false;
|
||||
private final AtomicBoolean closed = new AtomicBoolean(false);
|
||||
|
||||
private final Optional<MetricsConnection> metrics;
|
||||
|
||||
|
@ -188,14 +189,12 @@ class AsyncConnectionImpl implements AsyncConnection {
|
|||
|
||||
@Override
|
||||
public boolean isClosed() {
|
||||
return closed;
|
||||
return closed.get();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
// As the code below is safe to be executed in parallel, here we do not use CAS or lock, just a
|
||||
// simple volatile flag.
|
||||
if (closed) {
|
||||
if (!closed.compareAndSet(false, true)) {
|
||||
return;
|
||||
}
|
||||
IOUtils.closeQuietly(clusterStatusListener);
|
||||
|
@ -209,7 +208,6 @@ class AsyncConnectionImpl implements AsyncConnection {
|
|||
if (c != null) {
|
||||
c.closePool();
|
||||
}
|
||||
closed = true;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -18,18 +18,25 @@
|
|||
package org.apache.hadoop.hbase.client;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
import java.util.concurrent.ThreadPoolExecutor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.DoNotRetryIOException;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.log.HBaseMarkers;
|
||||
import org.apache.hadoop.hbase.util.ConcurrentMapUtils.IOExceptionSupplier;
|
||||
import org.apache.hadoop.hbase.util.FutureUtils;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
|
||||
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||
|
||||
/**
|
||||
* The connection implementation based on {@link AsyncConnection}.
|
||||
|
@ -41,6 +48,10 @@ class ConnectionOverAsyncConnection implements Connection {
|
|||
|
||||
private volatile boolean aborted = false;
|
||||
|
||||
// only used for executing coprocessor calls, as users may reference the methods in the
|
||||
// BlockingInterface of the protobuf stub so we have to execute the call in a separated thread...
|
||||
// Will be removed in 4.0.0 along with the deprecated coprocessor methods in Table and Admin
|
||||
// interface.
|
||||
private volatile ExecutorService batchPool = null;
|
||||
|
||||
private final AsyncConnectionImpl conn;
|
||||
|
@ -121,7 +132,7 @@ class ConnectionOverAsyncConnection implements Connection {
|
|||
|
||||
// will be called from AsyncConnection, to avoid infinite loop as in the above method we will call
|
||||
// AsyncConnection.close.
|
||||
void closePool() {
|
||||
synchronized void closePool() {
|
||||
ExecutorService batchPool = this.batchPool;
|
||||
if (batchPool != null) {
|
||||
ConnectionUtils.shutdownPool(batchPool);
|
||||
|
@ -134,13 +145,36 @@ class ConnectionOverAsyncConnection implements Connection {
|
|||
return conn.isClosed();
|
||||
}
|
||||
|
||||
private ExecutorService getBatchPool() {
|
||||
// only used for executing coprocessor calls, as users may reference the methods in the
|
||||
// BlockingInterface of the protobuf stub so we have to execute the call in a separated thread...
|
||||
// Will be removed in 4.0.0 along with the deprecated coprocessor methods in Table and Admin
|
||||
// interface.
|
||||
private ThreadPoolExecutor createThreadPool() {
|
||||
Configuration conf = conn.getConfiguration();
|
||||
int threads = conf.getInt("hbase.hconnection.threads.max", 256);
|
||||
long keepAliveTime = conf.getLong("hbase.hconnection.threads.keepalivetime", 60);
|
||||
BlockingQueue<Runnable> workQueue =
|
||||
new LinkedBlockingQueue<>(threads * conf.getInt(HConstants.HBASE_CLIENT_MAX_TOTAL_TASKS,
|
||||
HConstants.DEFAULT_HBASE_CLIENT_MAX_TOTAL_TASKS));
|
||||
ThreadPoolExecutor tpe = new ThreadPoolExecutor(threads, threads, keepAliveTime,
|
||||
TimeUnit.SECONDS, workQueue,
|
||||
new ThreadFactoryBuilder().setDaemon(true).setNameFormat(toString() + "-shared-%d").build());
|
||||
tpe.allowCoreThreadTimeOut(true);
|
||||
return tpe;
|
||||
}
|
||||
|
||||
// only used for executing coprocessor calls, as users may reference the methods in the
|
||||
// BlockingInterface of the protobuf stub so we have to execute the call in a separated thread...
|
||||
// Will be removed in 4.0.0 along with the deprecated coprocessor methods in Table and Admin
|
||||
// interface.
|
||||
private ExecutorService getBatchPool() throws IOException {
|
||||
if (batchPool == null) {
|
||||
synchronized (this) {
|
||||
if (isClosed()) {
|
||||
throw new DoNotRetryIOException("Connection is closed");
|
||||
}
|
||||
if (batchPool == null) {
|
||||
int threads = conn.getConfiguration().getInt("hbase.hconnection.threads.max", 256);
|
||||
this.batchPool = ConnectionUtils.getThreadPool(conn.getConfiguration(), threads, threads,
|
||||
() -> toString() + "-shared", null);
|
||||
this.batchPool = createThreadPool();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -153,13 +187,14 @@ class ConnectionOverAsyncConnection implements Connection {
|
|||
|
||||
@Override
|
||||
public Table build() {
|
||||
ExecutorService p = pool != null ? pool : getBatchPool();
|
||||
IOExceptionSupplier<ExecutorService> poolSupplier =
|
||||
pool != null ? () -> pool : ConnectionOverAsyncConnection.this::getBatchPool;
|
||||
return new TableOverAsyncTable(conn,
|
||||
conn.getTableBuilder(tableName).setRpcTimeout(rpcTimeout, TimeUnit.MILLISECONDS)
|
||||
.setReadRpcTimeout(readRpcTimeout, TimeUnit.MILLISECONDS)
|
||||
.setWriteRpcTimeout(writeRpcTimeout, TimeUnit.MILLISECONDS)
|
||||
.setOperationTimeout(operationTimeout, TimeUnit.MILLISECONDS).build(),
|
||||
p);
|
||||
poolSupplier);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
|
|
@ -29,12 +29,9 @@ import java.net.UnknownHostException;
|
|||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
import java.util.concurrent.ThreadLocalRandom;
|
||||
import java.util.concurrent.ThreadPoolExecutor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.function.Function;
|
||||
|
@ -54,7 +51,6 @@ import org.apache.hadoop.hbase.ipc.HBaseRpcController;
|
|||
import org.apache.hadoop.hbase.ipc.ServerRpcController;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.ReflectionUtils;
|
||||
import org.apache.hadoop.hbase.util.Threads;
|
||||
import org.apache.hadoop.ipc.RemoteException;
|
||||
import org.apache.hadoop.net.DNS;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
|
@ -646,29 +642,6 @@ public final class ConnectionUtils {
|
|||
return future;
|
||||
}
|
||||
|
||||
static ThreadPoolExecutor getThreadPool(Configuration conf, int maxThreads, int coreThreads,
|
||||
Supplier<String> threadName, BlockingQueue<Runnable> passedWorkQueue) {
|
||||
// shared HTable thread executor not yet initialized
|
||||
if (maxThreads == 0) {
|
||||
maxThreads = Runtime.getRuntime().availableProcessors() * 8;
|
||||
}
|
||||
if (coreThreads == 0) {
|
||||
coreThreads = Runtime.getRuntime().availableProcessors() * 8;
|
||||
}
|
||||
long keepAliveTime = conf.getLong("hbase.hconnection.threads.keepalivetime", 60);
|
||||
BlockingQueue<Runnable> workQueue = passedWorkQueue;
|
||||
if (workQueue == null) {
|
||||
workQueue =
|
||||
new LinkedBlockingQueue<>(maxThreads * conf.getInt(HConstants.HBASE_CLIENT_MAX_TOTAL_TASKS,
|
||||
HConstants.DEFAULT_HBASE_CLIENT_MAX_TOTAL_TASKS));
|
||||
coreThreads = maxThreads;
|
||||
}
|
||||
ThreadPoolExecutor tpe = new ThreadPoolExecutor(coreThreads, maxThreads, keepAliveTime,
|
||||
TimeUnit.SECONDS, workQueue, Threads.newDaemonThreadFactory(threadName.get()));
|
||||
tpe.allowCoreThreadTimeOut(true);
|
||||
return tpe;
|
||||
}
|
||||
|
||||
static void shutdownPool(ExecutorService pool) {
|
||||
pool.shutdown();
|
||||
try {
|
||||
|
|
|
@ -55,6 +55,7 @@ import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback;
|
|||
import org.apache.hadoop.hbase.io.TimeRange;
|
||||
import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.ConcurrentMapUtils.IOExceptionSupplier;
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||
import org.apache.hadoop.hbase.util.FutureUtils;
|
||||
import org.apache.hadoop.hbase.util.Pair;
|
||||
|
@ -76,12 +77,13 @@ class TableOverAsyncTable implements Table {
|
|||
|
||||
private final AsyncTable<?> table;
|
||||
|
||||
private final ExecutorService pool;
|
||||
private final IOExceptionSupplier<ExecutorService> poolSupplier;
|
||||
|
||||
TableOverAsyncTable(AsyncConnectionImpl conn, AsyncTable<?> table, ExecutorService pool) {
|
||||
TableOverAsyncTable(AsyncConnectionImpl conn, AsyncTable<?> table,
|
||||
IOExceptionSupplier<ExecutorService> poolSupplier) {
|
||||
this.conn = conn;
|
||||
this.table = table;
|
||||
this.pool = pool;
|
||||
this.poolSupplier = poolSupplier;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -423,6 +425,7 @@ class TableOverAsyncTable implements Table {
|
|||
private <R> void coprocssorService(String serviceName, byte[] startKey, byte[] endKey,
|
||||
Callback<R> callback, StubCall<R> call) throws Throwable {
|
||||
// get regions covered by the row range
|
||||
ExecutorService pool = this.poolSupplier.get();
|
||||
List<byte[]> keys = getStartKeysInRange(startKey, endKey);
|
||||
Map<byte[], Future<R>> futures = new TreeMap<>(Bytes.BYTES_COMPARATOR);
|
||||
try {
|
||||
|
@ -443,7 +446,7 @@ class TableOverAsyncTable implements Table {
|
|||
}
|
||||
} catch (RejectedExecutionException e) {
|
||||
// maybe the connection has been closed, let's check
|
||||
if (pool.isShutdown()) {
|
||||
if (conn.isClosed()) {
|
||||
throw new DoNotRetryIOException("Connection is closed", e);
|
||||
} else {
|
||||
throw new HBaseIOException("Coprocessor operation is rejected", e);
|
||||
|
|
|
@ -21,6 +21,7 @@ import static org.junit.Assert.assertEquals;
|
|||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import com.google.protobuf.ServiceException;
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
|
@ -30,6 +31,7 @@ import java.util.concurrent.atomic.AtomicReference;
|
|||
import java.util.stream.Collectors;
|
||||
import java.util.stream.IntStream;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.DoNotRetryIOException;
|
||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
|
@ -37,7 +39,11 @@ import org.apache.hadoop.hbase.HRegionLocation;
|
|||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.Waiter;
|
||||
import org.apache.hadoop.hbase.client.coprocessor.Batch;
|
||||
import org.apache.hadoop.hbase.coprocessor.MultiRowMutationEndpoint;
|
||||
import org.apache.hadoop.hbase.ipc.RpcClient;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProtos.MultiRowMutationService;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProtos.MutateRowsResponse;
|
||||
import org.apache.hadoop.hbase.testclassification.LargeTests;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||
|
@ -336,7 +342,7 @@ public class TestConnection {
|
|||
TEST_UTIL.getAdmin().createTable(builder.build());
|
||||
|
||||
try (Connection conn = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration());
|
||||
RegionLocator locator = conn.getRegionLocator(tableName)) {
|
||||
RegionLocator locator = conn.getRegionLocator(tableName)) {
|
||||
// Get locations of the regions of the table
|
||||
List<HRegionLocation> locations = locator.getAllRegionLocations();
|
||||
|
||||
|
@ -353,4 +359,27 @@ public class TestConnection {
|
|||
TEST_UTIL.deleteTable(tableName);
|
||||
}
|
||||
}
|
||||
|
||||
@Test(expected = DoNotRetryIOException.class)
|
||||
public void testClosedConnection() throws ServiceException, Throwable {
|
||||
byte[] family = Bytes.toBytes("cf");
|
||||
TableName tableName = TableName.valueOf(name.getMethodName());
|
||||
TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName)
|
||||
.setCoprocessor(MultiRowMutationEndpoint.class.getName())
|
||||
.setColumnFamily(ColumnFamilyDescriptorBuilder.of(family));
|
||||
TEST_UTIL.getAdmin().createTable(builder.build());
|
||||
|
||||
Connection conn = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration());
|
||||
// cache the location
|
||||
try (Table table = conn.getTable(tableName)) {
|
||||
table.get(new Get(Bytes.toBytes(0)));
|
||||
} finally {
|
||||
conn.close();
|
||||
}
|
||||
Batch.Call<MultiRowMutationService, MutateRowsResponse> callable = service -> {
|
||||
throw new RuntimeException("Should not arrive here");
|
||||
};
|
||||
conn.getTable(tableName).coprocessorService(MultiRowMutationService.class,
|
||||
HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW, callable);
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue