HBASE-22526 RejectedExecutionException could be thrown from TableOverAsyncTable.coprocessor service if the connection has been shutown

This commit is contained in:
zhangduo 2019-06-02 21:54:29 +08:00
parent 1a5c2a0257
commit 492a105e5d
1 changed files with 25 additions and 13 deletions

View File

@ -38,11 +38,14 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import org.apache.commons.lang3.ArrayUtils; import org.apache.commons.lang3.ArrayUtils;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CompareOperator; import org.apache.hadoop.hbase.CompareOperator;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HBaseIOException;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
@ -422,20 +425,29 @@ class TableOverAsyncTable implements Table {
// get regions covered by the row range // get regions covered by the row range
List<byte[]> keys = getStartKeysInRange(startKey, endKey); List<byte[]> keys = getStartKeysInRange(startKey, endKey);
Map<byte[], Future<R>> futures = new TreeMap<>(Bytes.BYTES_COMPARATOR); Map<byte[], Future<R>> futures = new TreeMap<>(Bytes.BYTES_COMPARATOR);
for (byte[] r : keys) { try {
RegionCoprocessorRpcChannel channel = coprocessorService(r); for (byte[] r : keys) {
Future<R> future = pool.submit(new Callable<R>() { RegionCoprocessorRpcChannel channel = coprocessorService(r);
@Override Future<R> future = pool.submit(new Callable<R>() {
public R call() throws Exception { @Override
R result = call.call(channel); public R call() throws Exception {
byte[] region = channel.getLastRegion(); R result = call.call(channel);
if (callback != null) { byte[] region = channel.getLastRegion();
callback.update(region, r, result); if (callback != null) {
callback.update(region, r, result);
}
return result;
} }
return result; });
} futures.put(r, future);
}); }
futures.put(r, future); } catch (RejectedExecutionException e) {
// maybe the connection has been closed, let's check
if (pool.isShutdown()) {
throw new DoNotRetryIOException("Connection is closed", e);
} else {
throw new HBaseIOException("Coprocessor operation is rejected", e);
}
} }
for (Map.Entry<byte[], Future<R>> e : futures.entrySet()) { for (Map.Entry<byte[], Future<R>> e : futures.entrySet()) {
try { try {