remove thread pool

This commit is contained in:
Jing Yu 2023-06-05 15:15:33 -07:00
parent 13bcb544f2
commit 407e4bb04e
4 changed files with 64 additions and 93 deletions

View File

@ -29,8 +29,6 @@ import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
import java.util.function.Function;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
@ -1512,18 +1510,18 @@ public interface AsyncAdmin {
* channel -> xxxService.newStub(channel)
* </pre>
*
* @param stubMaker a delegation to the actual {@code newStub} call.
* @param callable a delegation to the actual protobuf rpc call. See the comment of
* {@link ServiceCaller} for more details.
* @param stubMaker a delegation to the actual {@code newStub} call.
* @param callable a delegation to the actual protobuf rpc call. See the comment of
* {@link ServiceCaller} for more details.
* @param serverNames the given list of region servers
* @param <S> the type of the asynchronous stub
* @param <R> the type of the return value
* @return a list of return values of the protobuf rpc call, wrapped by a {@link CompletableFuture}.
* @param <S> the type of the asynchronous stub
* @param <R> the type of the return value
* @return Map of each region server to its result of the protobuf rpc call, wrapped by a
* {@link CompletableFuture}.
* @see ServiceCaller
*/
<S, R> CompletableFuture<List<R>> coprocessorService(Function<RpcChannel, S> stubMaker,
ServiceCaller<S, R> callable, List<ServerName> serverNames) throws ExecutionException,
InterruptedException, TimeoutException;
<S, R> CompletableFuture<Map<ServerName, Object>> coprocessorService(
Function<RpcChannel, S> stubMaker, ServiceCaller<S, R> callable, List<ServerName> serverNames);
/**
* List all the dead region servers.

View File

@ -23,9 +23,7 @@ import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeoutException;
import java.util.function.Function;
import java.util.regex.Pattern;
import org.apache.hadoop.hbase.CacheEvictionStats;
@ -809,9 +807,8 @@ class AsyncHBaseAdmin implements AsyncAdmin {
}
@Override
public <S, R> CompletableFuture<List<R>> coprocessorService(Function<RpcChannel, S> stubMaker,
ServiceCaller<S, R> callable, List<ServerName> serverNames)
throws ExecutionException, InterruptedException, TimeoutException {
public <S, R> CompletableFuture<Map<ServerName, Object>> coprocessorService(
Function<RpcChannel, S> stubMaker, ServiceCaller<S, R> callable, List<ServerName> serverNames) {
return wrap(rawAdmin.coprocessorService(stubMaker, callable, serverNames));
}

View File

@ -36,11 +36,7 @@ import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
@ -99,8 +95,8 @@ import org.apache.hadoop.hbase.snapshot.SnapshotCreationException;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.ForeignExceptionUtil;
import org.apache.hadoop.hbase.util.FutureUtils;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -3511,40 +3507,27 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
}
@Override
public <S, R> CompletableFuture<List<R>> coprocessorService(Function<RpcChannel, S> stubMaker,
ServiceCaller<S, R> callable, List<ServerName> serverNames)
throws ExecutionException, InterruptedException, TimeoutException {
List<S> stubs = new ArrayList<>(serverNames.size());
for(ServerName serverName : serverNames) {
RegionServerCoprocessorRpcChannelImpl channel = new RegionServerCoprocessorRpcChannelImpl(
this.<Message> newServerCaller().serverName(serverName));
S stub = stubMaker.apply(channel);
stubs.add(stub);
}
return CompletableFuture.supplyAsync(() -> {
ExecutorService executorService = Executors.newFixedThreadPool(serverNames.size(),
new ThreadFactoryBuilder().setNameFormat("coproc-service-%d").setDaemon(true).build());
List<CompletableFuture<R>> completableFutureList = new ArrayList<>();
for (S stub : stubs) {
CompletableFuture<R> future = new CompletableFuture<>();
completableFutureList.add(future);
ClientCoprocessorRpcController controller = new ClientCoprocessorRpcController();
executorService.execute(() -> callable.call(stub, controller, resp -> {
if (controller.failed()) {
future.completeExceptionally(controller.getFailed());
public <S, R> CompletableFuture<Map<ServerName, Object>> coprocessorService(
Function<RpcChannel, S> stubMaker, ServiceCaller<S, R> callable, List<ServerName> serverNames) {
CompletableFuture<Map<ServerName, Object>> future = new CompletableFuture<>();
Map<ServerName, Object> resultMap = new HashMap<>();
for (ServerName rs : serverNames) {
FutureUtils.addListener(coprocessorService(stubMaker, callable, rs), (r, e) -> {
boolean done;
synchronized (resultMap) {
if (e != null) {
resultMap.put(rs, e);
} else {
future.complete(resp);
resultMap.put(rs, r);
}
}));
}
List<R> listValues = new ArrayList<>(serverNames.size());
for(CompletableFuture<R> completableFuture : completableFutureList) {
listValues.add(completableFuture.join());
}
executorService.shutdownNow();
return listValues;
});
done = resultMap.size() == serverNames.size();
}
if (done) {
future.complete(resultMap);
}
});
}
return future;
}
@Override

View File

@ -19,12 +19,13 @@ package org.apache.hadoop.hbase.coprocessor;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.FileNotFoundException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.ServerName;
@ -46,7 +47,6 @@ import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
import org.apache.hbase.thirdparty.com.google.protobuf.Service;
import org.apache.hadoop.hbase.shaded.coprocessor.protobuf.generated.DummyRegionServerEndpointProtos;
import org.apache.hadoop.hbase.shaded.coprocessor.protobuf.generated.DummyRegionServerEndpointProtos.DummyRequest;
import org.apache.hadoop.hbase.shaded.coprocessor.protobuf.generated.DummyRegionServerEndpointProtos.DummyResponse;
import org.apache.hadoop.hbase.shaded.coprocessor.protobuf.generated.DummyRegionServerEndpointProtos.DummyService;
@ -76,54 +76,47 @@ public class TestAsyncRegionServersCoprocessorEndpoint extends TestAsyncAdminBas
}
@Test
public void testRegionServerCoprocessorServiceWithMultipleServers() throws Exception {
final List<JVMClusterUtil.RegionServerThread> regionServerThreads = TEST_UTIL.getHBaseCluster().getRegionServerThreads();
public void testRegionServersCoprocessorService()
throws ExecutionException, InterruptedException {
final List<JVMClusterUtil.RegionServerThread> regionServerThreads =
TEST_UTIL.getHBaseCluster().getRegionServerThreads();
List<ServerName> serverNames = new ArrayList<>();
for (JVMClusterUtil.RegionServerThread t : regionServerThreads) {
serverNames.add(t.getRegionServer().getServerName());
}
DummyRegionServerEndpointProtos.DummyRequest request =
DummyRegionServerEndpointProtos.DummyRequest.getDefaultInstance();
List<DummyResponse> responses =
admin.<DummyRegionServerEndpointProtos.DummyService.Stub,
DummyRegionServerEndpointProtos.DummyResponse> coprocessorService(
DummyRegionServerEndpointProtos.DummyService::newStub,
(s, c, done) -> s.dummyCall(c, request, done), serverNames)
.get();
regionServerThreads.forEach(t -> serverNames.add(t.getRegionServer().getServerName()));
assertEquals(responses.size(), serverNames.size());
for (DummyResponse response : responses) {
assertEquals(DUMMY_VALUE, response.getValue());
}
DummyRequest request = DummyRequest.getDefaultInstance();
Map<ServerName, Object> resultMap =
admin.<DummyService.Stub, DummyResponse> coprocessorService(DummyService::newStub,
(s, c, done) -> s.dummyCall(c, request, done), serverNames).get();
resultMap.forEach((k, v) -> {
assertTrue(v instanceof DummyResponse);
DummyResponse resp = (DummyResponse) v;
assertEquals(DUMMY_VALUE, resp.getValue());
});
}
@Test
public void testRegionServerCoprocessorServiceErrorWithMultipleServers() throws Exception {
final List<JVMClusterUtil.RegionServerThread> regionServerThreads = TEST_UTIL.getHBaseCluster().getRegionServerThreads();
public void testRegionServerCoprocessorsServiceError()
throws ExecutionException, InterruptedException {
final List<JVMClusterUtil.RegionServerThread> regionServerThreads =
TEST_UTIL.getHBaseCluster().getRegionServerThreads();
List<ServerName> serverNames = new ArrayList<>();
for (JVMClusterUtil.RegionServerThread t : regionServerThreads) {
serverNames.add(t.getRegionServer().getServerName());
}
DummyRegionServerEndpointProtos.DummyRequest request =
DummyRegionServerEndpointProtos.DummyRequest.getDefaultInstance();
try {
admin.<DummyRegionServerEndpointProtos.DummyService.Stub,
DummyRegionServerEndpointProtos.DummyResponse> coprocessorService(
DummyRegionServerEndpointProtos.DummyService::newStub,
(s, c, done) -> s.dummyThrow(c, request, done), serverNames)
.get();
fail("Should have thrown an exception");
} catch (Exception e) {
assertTrue(e.getCause() instanceof RetriesExhaustedException);
assertTrue(e.getCause().getMessage().contains(WHAT_TO_THROW.getClass().getName().trim()));
}
regionServerThreads.forEach(t -> serverNames.add(t.getRegionServer().getServerName()));
DummyRequest request = DummyRequest.getDefaultInstance();
Map<ServerName, Object> resultMap =
admin.<DummyService.Stub, DummyResponse> coprocessorService(DummyService::newStub,
(s, c, done) -> s.dummyThrow(c, request, done), serverNames).get();
resultMap.forEach((k, v) -> {
assertTrue(v instanceof RetriesExhaustedException);
Throwable e = (Throwable) v;
assertTrue(e.getMessage().contains(WHAT_TO_THROW.getClass().getName().trim()));
});
}
public static class DummyRegionServerEndpoint extends DummyService
implements RegionServerCoprocessor {
public DummyRegionServerEndpoint() {
}
@Override
public Iterable<Service> getServices() {
return Collections.singleton(this);