diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java index 245c5194cb0..6f3fa23f67d 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java @@ -48,14 +48,10 @@ import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.metrics.ScanMetrics; import org.apache.hadoop.hbase.ipc.HBaseRpcController; import org.apache.hadoop.hbase.security.User; -import org.apache.hadoop.hbase.security.UserProvider; import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter; -import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; -import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterService; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.ReflectionUtils; import org.apache.hadoop.ipc.RemoteException; @@ -129,68 +125,6 @@ public final class ConnectionUtils { log.info(sn + " server-side Connection retries=" + retries); } - /** - * A ClusterConnection that will short-circuit RPC making direct invocations against the localhost - * if the invocation target is 'this' server; save on network and protobuf invocations. - */ - // TODO This has to still do PB marshalling/unmarshalling stuff. Check how/whether we can avoid. - // Class is visible so can assert we are short-circuiting when expected. - public static class ShortCircuitingClusterConnection extends ConnectionImplementation { - private final ServerName serverName; - private final AdminService.BlockingInterface localHostAdmin; - private final ClientService.BlockingInterface localHostClient; - - private ShortCircuitingClusterConnection(Configuration conf, ExecutorService pool, User user, - ServerName serverName, AdminService.BlockingInterface admin, - ClientService.BlockingInterface client) throws IOException { - super(conf, pool, user); - this.serverName = serverName; - this.localHostAdmin = admin; - this.localHostClient = client; - } - - @Override - public AdminService.BlockingInterface getAdmin(ServerName sn) throws IOException { - return serverName.equals(sn) ? this.localHostAdmin : super.getAdmin(sn); - } - - @Override - public ClientService.BlockingInterface getClient(ServerName sn) throws IOException { - return serverName.equals(sn) ? this.localHostClient : super.getClient(sn); - } - - @Override - public MasterKeepAliveConnection getMaster() throws IOException { - if (this.localHostClient instanceof MasterService.BlockingInterface) { - return new ShortCircuitMasterConnection( - (MasterService.BlockingInterface) this.localHostClient); - } - return super.getMaster(); - } - } - - /** - * Creates a short-circuit connection that can bypass the RPC layer (serialization, - * deserialization, networking, etc..) when talking to a local server. - * @param conf the current configuration - * @param pool the thread pool to use for batch operations - * @param user the user the connection is for - * @param serverName the local server name - * @param admin the admin interface of the local server - * @param client the client interface of the local server - * @return an short-circuit connection. - * @throws IOException if IO failure occurred - */ - public static ClusterConnection createShortCircuitConnection(final Configuration conf, - ExecutorService pool, User user, final ServerName serverName, - final AdminService.BlockingInterface admin, final ClientService.BlockingInterface client) - throws IOException { - if (user == null) { - user = UserProvider.instantiate(conf).getCurrent(); - } - return new ShortCircuitingClusterConnection(conf, pool, user, serverName, admin, client); - } - /** * Setup the connection class, so that it will not depend on master being online. Used for testing * @param conf configuration to set diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/ServerConnectionUtils.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/ServerConnectionUtils.java new file mode 100644 index 00000000000..658b42c181a --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/ServerConnectionUtils.java @@ -0,0 +1,229 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import java.io.IOException; +import java.util.Optional; +import java.util.concurrent.ExecutorService; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.coprocessor.RegionObserver; +import org.apache.hadoop.hbase.ipc.RpcCall; +import org.apache.hadoop.hbase.ipc.RpcServer; +import org.apache.hadoop.hbase.ipc.ServerCall; +import org.apache.hadoop.hbase.regionserver.RSRpcServices; +import org.apache.hadoop.hbase.regionserver.RegionScanner; +import org.apache.hadoop.hbase.security.User; +import org.apache.hadoop.hbase.security.UserProvider; +import org.apache.yetus.audience.InterfaceAudience; + +import org.apache.hbase.thirdparty.com.google.protobuf.RpcController; +import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; + +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CleanupBulkLoadRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CleanupBulkLoadResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.PrepareBulkLoadRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.PrepareBulkLoadResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterService; + +@InterfaceAudience.Private +public final class ServerConnectionUtils { + + private ServerConnectionUtils() { + } + + /** + * A ClusterConnection that will short-circuit RPC making direct invocations against the localhost + * if the invocation target is 'this' server; save on network and protobuf invocations. + */ + // TODO This has to still do PB marshalling/unmarshalling stuff. Check how/whether we can avoid. + // Class is visible so can assert we are short-circuiting when expected. + public final static class ShortCircuitingClusterConnection extends ConnectionImplementation { + private final ServerName serverName; + private final AdminService.BlockingInterface localHostAdmin; + private final ClientService.BlockingInterface localHostClient; + private final ClientService.BlockingInterface localClientServiceBlockingInterfaceWrapper; + + private ShortCircuitingClusterConnection(Configuration conf, ExecutorService pool, User user, + ServerName serverName, AdminService.BlockingInterface admin, + ClientService.BlockingInterface client) throws IOException { + super(conf, pool, user); + this.serverName = serverName; + this.localHostAdmin = admin; + this.localHostClient = client; + this.localClientServiceBlockingInterfaceWrapper = + new ClientServiceBlockingInterfaceWrapper(this.localHostClient); + } + + @Override + public AdminService.BlockingInterface getAdmin(ServerName sn) throws IOException { + return serverName.equals(sn) ? this.localHostAdmin : super.getAdmin(sn); + } + + @Override + public ClientService.BlockingInterface getClient(ServerName sn) throws IOException { + return serverName.equals(sn) ? this.localClientServiceBlockingInterfaceWrapper + : super.getClient(sn); + } + + @Override + public MasterKeepAliveConnection getMaster() throws IOException { + if (this.localHostClient instanceof MasterService.BlockingInterface) { + return new ShortCircuitMasterConnection( + (MasterService.BlockingInterface) this.localHostClient); + } + return super.getMaster(); + } + + /** + * When we directly invoke {@link RSRpcServices#get} on the same RegionServer through + * {@link ShortCircuitingClusterConnection} in region CPs such as + * {@link RegionObserver#postScannerOpen} to get other rows, the {@link RegionScanner} created + * for the directly {@link RSRpcServices#get} may not be closed until the outmost rpc call is + * completed if there is an outmost {@link RpcCall}, and even worse , the + * {@link ServerCall#rpcCallback} may be override which would cause serious problem,so for + * {@link ShortCircuitingClusterConnection#getClient}, if return + * {@link ShortCircuitingClusterConnection#localHostClient},we would add a wrapper class to wrap + * it , which using {@link RpcServer#unsetCurrentCall} and {RpcServer#setCurrentCall} to + * surround the scan and get method call,so the {@link RegionScanner} created for the directly + * {@link RSRpcServices#get} could be closed immediately,see HBASE-26812 for more. + */ + static class ClientServiceBlockingInterfaceWrapper + implements ClientService.BlockingInterface { + + private ClientService.BlockingInterface target; + + ClientServiceBlockingInterfaceWrapper(ClientService.BlockingInterface target) { + this.target = target; + } + + @Override + public GetResponse get(RpcController controller, GetRequest request) throws ServiceException { + return this.doCall(controller, request, (c, r) -> { + return target.get(c, r); + }); + } + + @Override + public MultiResponse multi(RpcController controller, MultiRequest request) + throws ServiceException { + /** + * Here is for multiGet + */ + return this.doCall(controller, request, (c, r) -> { + return target.multi(c, r); + }); + } + + @Override + public ScanResponse scan(RpcController controller, ScanRequest request) + throws ServiceException { + return this.doCall(controller, request, (c, r) -> { + return target.scan(c, r); + }); + } + + interface Operation { + RESPONSE call(RpcController controller, REQUEST request) throws ServiceException; + } + + private RESPONSE doCall(RpcController controller, REQUEST request, + Operation operation) throws ServiceException { + Optional rpcCallOptional = RpcServer.unsetCurrentCall(); + try { + return operation.call(controller, request); + } finally { + rpcCallOptional.ifPresent(RpcServer::setCurrentCall); + } + } + + @Override + public MutateResponse mutate(RpcController controller, MutateRequest request) + throws ServiceException { + return target.mutate(controller, request); + } + + @Override + public BulkLoadHFileResponse bulkLoadHFile(RpcController controller, + BulkLoadHFileRequest request) throws ServiceException { + return target.bulkLoadHFile(controller, request); + } + + @Override + public PrepareBulkLoadResponse prepareBulkLoad(RpcController controller, + PrepareBulkLoadRequest request) throws ServiceException { + return target.prepareBulkLoad(controller, request); + } + + @Override + public CleanupBulkLoadResponse cleanupBulkLoad(RpcController controller, + CleanupBulkLoadRequest request) throws ServiceException { + return target.cleanupBulkLoad(controller, request); + } + + @Override + public CoprocessorServiceResponse execService(RpcController controller, + CoprocessorServiceRequest request) throws ServiceException { + return target.execService(controller, request); + } + + @Override + public CoprocessorServiceResponse execRegionServerService(RpcController controller, + CoprocessorServiceRequest request) throws ServiceException { + return target.execRegionServerService(controller, request); + } + } + } + + /** + * Creates a short-circuit connection that can bypass the RPC layer (serialization, + * deserialization, networking, etc..) when talking to a local server. + * @param conf the current configuration + * @param user the user the connection is for + * @param serverName the local server name + * @param admin the admin interface of the local server + * @param client the client interface of the local server + * @param registry the connection registry to be used, can be null + * @return an short-circuit connection. + * @throws IOException if IO failure occurred + */ + public static ClusterConnection createShortCircuitConnection(final Configuration conf, + ExecutorService pool, User user, final ServerName serverName, + final AdminService.BlockingInterface admin, final ClientService.BlockingInterface client) + throws IOException { + if (user == null) { + user = UserProvider.instantiate(conf).getCurrent(); + } + return new ShortCircuitingClusterConnection(conf, pool, user, serverName, admin, client); + } + +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionCoprocessorEnvironment.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionCoprocessorEnvironment.java index 84e6d25e769..162b41cfdb4 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionCoprocessorEnvironment.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionCoprocessorEnvironment.java @@ -21,7 +21,6 @@ package org.apache.hadoop.hbase.coprocessor; import java.io.IOException; import java.util.concurrent.ConcurrentMap; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.CoprocessorEnvironment; import org.apache.hadoop.hbase.HBaseInterfaceAudience; @@ -32,6 +31,7 @@ import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.metrics.MetricRegistry; import org.apache.hadoop.hbase.regionserver.OnlineRegions; import org.apache.hadoop.hbase.regionserver.Region; +import org.apache.hadoop.hbase.security.User; import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceStability; @@ -58,51 +58,53 @@ public interface RegionCoprocessorEnvironment extends CoprocessorEnvironmentDo not close! This is a shared connection - * with the hosting server. Throws {@link UnsupportedOperationException} if you try to close - * or abort it. - * - * For light-weight usage only. Heavy-duty usage will pull down - * the hosting RegionServer responsiveness as well as that of other Coprocessors making use of - * this Connection. Use to create table on start or to do administrative operations. Coprocessors - * should create their own Connections if heavy usage to avoid impinging on hosting Server - * operation. To create a Connection or if a Coprocessor requires a region with a particular - * Configuration, use {@link org.apache.hadoop.hbase.client.ConnectionFactory} or + * Returns the hosts' Connection to the Cluster. Do not close! This is a shared connection with + * the hosting server. Throws {@link UnsupportedOperationException} if you try to close or abort + * it. For light-weight usage only. Heavy-duty usage will pull down the hosting RegionServer + * responsiveness as well as that of other Coprocessors making use of this Connection. Use to + * create table on start or to do administrative operations. Coprocessors should create their own + * Connections if heavy usage to avoid impinging on hosting Server operation. To create a + * Connection or if a Coprocessor requires a region with a particular Configuration, use + * {@link org.apache.hadoop.hbase.client.ConnectionFactory} or * {@link #createConnection(Configuration)}}. - * - *

Be aware that operations that make use of this Connection are executed as the RegionServer + *

+ * Be aware that operations that make use of this Connection are executed as the RegionServer * User, the hbase super user that started this server process. Exercise caution running - * operations as this User (See {@link #createConnection(Configuration)}} to run as other than - * the RegionServer User). - * - *

Be careful RPC'ing from a Coprocessor context. RPC's will fail, stall, retry, and/or crawl + * operations as this User (See {@link #createConnection(Configuration)}} to run as other than the + * RegionServer User). + *

+ * Be careful RPC'ing from a Coprocessor context. RPC's will fail, stall, retry, and/or crawl * because the remote side is not online, is struggling or it is on the other side of a network * partition. Any use of Connection from inside a Coprocessor must be able to handle all such * hiccups. - * + *

+ * NOTE: the returned Connection is created using {@link User#getCurrent},so the Connection is as + * the System User who running the RegionServer, not the client User who initiate the RPC. * @see #createConnection(Configuration) * @return The host's Connection to the Cluster. */ Connection getConnection(); /** - * Creates a cluster connection using the passed Configuration. - * - * Creating a Connection is a heavy-weight operation. The resultant Connection's cache of - * region locations will be empty. Therefore you should cache and reuse Connections rather than - * create a Connection on demand. Create on start of your Coprocessor. You will have to cast - * the CoprocessorEnvironment appropriately to get at this API at start time because - * Coprocessor start method is passed a subclass of this CoprocessorEnvironment or fetch - * Connection using a synchronized accessor initializing the Connection on first access. Close - * the returned Connection when done to free resources. Using this API rather - * than {@link org.apache.hadoop.hbase.client.ConnectionFactory#createConnection(Configuration)} + * Creates a cluster connection using the passed Configuration. Creating a Connection is a + * heavy-weight operation. The resultant Connection's cache of region locations will be empty. + * Therefore you should cache and reuse Connections rather than create a Connection on demand. + * Create on start of your Coprocessor. You will have to cast the CoprocessorEnvironment + * appropriately to get at this API at start time because Coprocessor start method is passed a + * subclass of this CoprocessorEnvironment or fetch Connection using a synchronized accessor + * initializing the Connection on first access. Close the returned Connection when done to free + * resources. Using this API rather than + * {@link org.apache.hadoop.hbase.client.ConnectionFactory#createConnection(Configuration)} * returns a Connection that will short-circuit RPC if the target is a local resource. Use * ConnectionFactory if you don't need this ability. - * - *

Be careful RPC'ing from a Coprocessor context. RPC's will fail, stall, retry, and/or crawl + *

+ * Be careful RPC'ing from a Coprocessor context. RPC's will fail, stall, retry, and/or crawl * because the remote side is not online, is struggling or it is on the other side of a network * partition. Any use of Connection from inside a Coprocessor must be able to handle all such * hiccups. + *

+ * NOTE: the returned Connection is created using {@link User#getCurrent},so the Connection is as + * the System User who running the RegionServer, not the client User who initiate the RPC. * @return Connection created using the passed conf. */ Connection createConnection(Configuration conf) throws IOException; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java index dc35ceb6c29..1e647ac06df 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java @@ -695,6 +695,7 @@ public abstract class RpcServer implements RpcServerInterface, return rpcCall; } + /** * Used by {@link org.apache.hadoop.hbase.procedure2.store.region.RegionProcedureStore}. Set the * rpc call back after mutate region. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerCall.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerCall.java index 9a4c11ad404..a0d46474c5a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerCall.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerCall.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hbase.ipc; +import com.google.errorprone.annotations.RestrictedApi; import java.io.IOException; import java.net.InetAddress; import java.nio.ByteBuffer; @@ -559,4 +560,10 @@ public abstract class ServerCall implements RpcCa return response; } } + + @RestrictedApi(explanation = "Should only be called in tests", link = "", + allowedOnPath = ".*/src/test/.*") + public synchronized RpcCallback getCallBack() { + return this.rpcCallback; + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 48f4f32979c..dab7d14108c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -411,7 +411,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi static final long DEFAULT_ROW_PROCESSOR_TIMEOUT = 60 * 1000L; final ExecutorService rowProcessorExecutor = Executors.newCachedThreadPool(); - private final ConcurrentHashMap scannerReadPoints; + final ConcurrentHashMap scannerReadPoints; /** * The sequence ID that was enLongAddered when this region was opened. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index bcbf93d9b89..c87a1e2f22b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -92,6 +92,7 @@ import org.apache.hadoop.hbase.client.ConnectionUtils; import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.client.RegionInfoBuilder; import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory; +import org.apache.hadoop.hbase.client.ServerConnectionUtils; import org.apache.hadoop.hbase.client.locking.EntityLock; import org.apache.hadoop.hbase.client.locking.LockServiceClient; import org.apache.hadoop.hbase.conf.ConfigurationManager; @@ -863,7 +864,7 @@ public class HRegionServer extends Thread implements // Create a cluster connection that when appropriate, can short-circuit and go directly to the // local server if the request is to the local server bypassing RPC. Can be used for both local // and remote invocations. - return ConnectionUtils.createShortCircuitConnection(conf, null, userProvider.getCurrent(), + return ServerConnectionUtils.createShortCircuitConnection(conf, null, userProvider.getCurrent(), serverName, rpcServices, rpcServices); } @@ -3846,7 +3847,7 @@ public class HRegionServer extends Thread implements @Override public Connection createConnection(Configuration conf) throws IOException { User user = UserProvider.instantiate(conf).getCurrent(); - return ConnectionUtils.createShortCircuitConnection(conf, null, user, this.serverName, + return ServerConnectionUtils.createShortCircuitConnection(conf, null, user, this.serverName, this.rpcServices, this.rpcServices); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestShortCircuitConnection.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestShortCircuitConnection.java index 991cfb9e481..0f77a458e14 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestShortCircuitConnection.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestShortCircuitConnection.java @@ -83,7 +83,9 @@ public class TestShortCircuitConnection { AdminService.BlockingInterface admin = connection.getAdmin(regionServer.getServerName()); ClientService.BlockingInterface client = connection.getClient(regionServer.getServerName()); assertTrue(admin instanceof RSRpcServices); - assertTrue(client instanceof RSRpcServices); + assertTrue( + client instanceof ServerConnectionUtils.ShortCircuitingClusterConnection + .ClientServiceBlockingInterfaceWrapper); ServerName anotherSn = ServerName.valueOf(regionServer.getServerName().getAddress(), EnvironmentEdgeManager.currentTime()); admin = connection.getAdmin(anotherSn); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorShortCircuitRPC.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorShortCircuitRPC.java index 6cff379a443..4d322707aa7 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorShortCircuitRPC.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorShortCircuitRPC.java @@ -26,7 +26,7 @@ import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.SharedConnection; import org.apache.hadoop.hbase.client.Connection; -import org.apache.hadoop.hbase.client.ConnectionUtils; +import org.apache.hadoop.hbase.client.ServerConnectionUtils; import org.apache.hadoop.hbase.testclassification.CoprocessorTests; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.junit.AfterClass; @@ -119,7 +119,7 @@ public class TestCoprocessorShortCircuitRPC { } private static void checkShortCircuit(Connection connection) { - assertTrue(connection instanceof ConnectionUtils.ShortCircuitingClusterConnection); + assertTrue(connection instanceof ServerConnectionUtils.ShortCircuitingClusterConnection); } @Test diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestShortCircuitGet.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestShortCircuitGet.java new file mode 100644 index 00000000000..14232567f24 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestShortCircuitGet.java @@ -0,0 +1,355 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Optional; +import java.util.concurrent.atomic.AtomicReference; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.MiniHBaseCluster.MiniHBaseClusterRegionServer; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.coprocessor.ObserverContext; +import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; +import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; +import org.apache.hadoop.hbase.coprocessor.RegionObserver; +import org.apache.hadoop.hbase.ipc.RpcServer; +import org.apache.hadoop.hbase.ipc.ServerCall; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.testclassification.RegionServerTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; + +import org.apache.hbase.thirdparty.com.google.protobuf.RpcController; +import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; + +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse; + +@Category({ RegionServerTests.class, MediumTests.class }) +public class TestShortCircuitGet { + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestShortCircuitGet.class); + + private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + private static final byte[] FAMILY = Bytes.toBytes("testFamily"); + private static final byte[] QUALIFIER = Bytes.toBytes("testQualifier"); + private static final byte[] VALUE = Bytes.toBytes("testValue"); + + static final byte[] r0 = Bytes.toBytes("row-0"); + static final byte[] r1 = Bytes.toBytes("row-1"); + static final byte[] r2 = Bytes.toBytes("row-2"); + static final byte[] r3 = Bytes.toBytes("row-3"); + static final byte[] r4 = Bytes.toBytes("row-4"); + static final byte[] r5 = Bytes.toBytes("row-5"); + static final byte[] r6 = Bytes.toBytes("row-6"); + static final TableName tableName = TableName.valueOf("TestShortCircuitGet"); + + @Rule + public TestName name = new TestName(); + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + Configuration conf = TEST_UTIL.getConfiguration(); + conf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 30 * 60 * 1000); + conf.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, 30 * 60 * 1000); + + conf.setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 60 * 60 * 1000); + conf.setStrings(HConstants.REGION_SERVER_IMPL, MyRegionServer.class.getName()); + conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1); + conf.setInt(HConstants.HBASE_CLIENT_PAUSE, 10000); + TEST_UTIL.startMiniCluster(1); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + TEST_UTIL.shutdownMiniCluster(); + } + + /** + * This test is for HBASE-26821,when we initiate get or scan in cp, the {@link RegionScanner} for + * get and scan is close when get or scan is completed. + */ + @Test + public void testScannerCloseWhenScanAndGetInCP() throws Exception { + Table table = null; + try { + table = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY }, 1, + HConstants.DEFAULT_BLOCKSIZE, MyScanObserver.class.getName()); + putToTable(table, r0); + putToTable(table, r1); + putToTable(table, r2); + putToTable(table, r3); + putToTable(table, r4); + putToTable(table, r5); + putToTable(table, r6); + } finally { + if (table != null) { + table.close(); + } + } + + final Configuration conf = TEST_UTIL.getConfiguration(); + ResultScanner resultScanner = null; + Connection conn = null; + Table clientTable = null; + try { + conn = ConnectionFactory.createConnection(conf); + clientTable = conn.getTable(tableName); + Scan scan = new Scan(); + scan.setCaching(1); + scan.withStartRow(r0, true).withStopRow(r1, true); + resultScanner = table.getScanner(scan); + Result result = resultScanner.next(); + assertTrue("Expected row: row-0", Bytes.equals(r0, result.getRow())); + result = resultScanner.next(); + assertTrue("Expected row: row-1", Bytes.equals(r1, result.getRow())); + assertNull(resultScanner.next()); + } finally { + if (resultScanner != null) { + resultScanner.close(); + } + if (clientTable != null) { + clientTable.close(); + } + if (conn != null) { + conn.close(); + } + } + + assertTrue(MyRSRpcServices.exceptionRef.get() == null); + assertTrue(MyScanObserver.exceptionRef.get() == null); + } + + private void putToTable(Table ht, byte[] rowkey) throws IOException { + Put put = new Put(rowkey); + put.addColumn(FAMILY, QUALIFIER, VALUE); + ht.put(put); + } + + private static class MyRegionServer extends MiniHBaseClusterRegionServer { + public MyRegionServer(Configuration conf) + throws IOException, InterruptedException { + super(conf); + } + + @Override + protected RSRpcServices createRpcServices() throws IOException { + return new MyRSRpcServices(this); + } + } + + private static class MyRSRpcServices extends RSRpcServices { + private static AtomicReference exceptionRef = new AtomicReference(null); + public MyRSRpcServices(HRegionServer rs) throws IOException { + super(rs); + } + + @Override + public MultiResponse multi(RpcController rpcc, MultiRequest request) throws ServiceException { + try { + if (!MyScanObserver.inCP) { + return super.multi(rpcc, request); + } + + assertTrue(!RpcServer.getCurrentCall().isPresent()); + return super.multi(rpcc, request); + } catch (Throwable e) { + exceptionRef.set(e); + throw new ServiceException(e); + } + } + + @Override + public ScanResponse scan(RpcController controller, ScanRequest request) + throws ServiceException { + try { + if (!MyScanObserver.inCP) { + return super.scan(controller, request); + } + + HRegion region = null; + if (request.hasRegion()) { + region = this.getRegion(request.getRegion()); + } + + if (region != null + && TableName.isMetaTableName(region.getTableDescriptor().getTableName())) { + return super.scan(controller, request); + } + + assertTrue(!RpcServer.getCurrentCall().isPresent()); + return super.scan(controller, request); + } catch (Throwable e) { + exceptionRef.set(e); + throw new ServiceException(e); + } + } + + @Override + public GetResponse get(RpcController controller, GetRequest request) throws ServiceException { + try { + if (!MyScanObserver.inCP) { + return super.get(controller, request); + } + + HRegion region = null; + if (request.hasRegion()) { + region = this.getRegion(request.getRegion()); + } + if (region != null + && TableName.isMetaTableName(region.getTableDescriptor().getTableName())) { + return super.get(controller, request); + } + + assertTrue(!RpcServer.getCurrentCall().isPresent()); + return super.get(controller, request); + } catch (Throwable e) { + exceptionRef.set(e); + throw new ServiceException(e); + } + } + } + + public static class MyScanObserver implements RegionCoprocessor, RegionObserver { + + private static volatile boolean inCP = false; + private static AtomicReference exceptionRef = new AtomicReference(null); + @Override + public Optional getRegionObserver() { + return Optional.of(this); + } + + @SuppressWarnings("rawtypes") + @Override + public RegionScanner postScannerOpen( + final ObserverContext observerContext, final Scan scan, + final RegionScanner regionScanner) throws IOException { + + if (inCP) { + return regionScanner; + } + + HRegion region = (HRegion) observerContext.getEnvironment().getRegion(); + int prevScannerCount = region.scannerReadPoints.size(); + Table table1 = null; + Get get2 = new Get(r2); + inCP = true; + try (Connection connection = observerContext.getEnvironment() + .createConnection(observerContext.getEnvironment().getConfiguration())) { + try { + table1 = connection.getTable(tableName); + Result result = table1.get(get2); + assertTrue("Expected row: row-2", Bytes.equals(r2, result.getRow())); + + } finally { + if (table1 != null) { + table1.close(); + } + inCP = false; + } + + // RegionScanner is closed and there is no rpcCallBack set + assertTrue(prevScannerCount == region.scannerReadPoints.size()); + ServerCall serverCall = (ServerCall) RpcServer.getCurrentCall().get(); + assertTrue(serverCall.getCallBack() == null); + + Get get3 = new Get(r3); + Get get4 = new Get(r4); + Table table2 = null; + inCP = true; + try { + table2 = connection.getTable(tableName); + Result[] results = table2.get(Arrays.asList(get3, get4)); + assertTrue("Expected row: row-3", Bytes.equals(r3, results[0].getRow())); + assertTrue("Expected row: row-4", Bytes.equals(r4, results[1].getRow())); + } finally { + if (table2 != null) { + table2.close(); + } + inCP = false; + } + + // RegionScanner is closed and there is no rpcCallBack set + assertTrue(prevScannerCount == region.scannerReadPoints.size()); + serverCall = (ServerCall) RpcServer.getCurrentCall().get(); + assertTrue(serverCall.getCallBack() == null); + + Scan newScan = new Scan(); + newScan.setCaching(1); + newScan.withStartRow(r5, true).withStopRow(r6, true); + Table table3 = null; + ResultScanner resultScanner = null; + inCP = true; + try { + table3 = connection.getTable(tableName); + resultScanner = table3.getScanner(newScan); + Result result = resultScanner.next(); + assertTrue("Expected row: row-5", Bytes.equals(r5, result.getRow())); + result = resultScanner.next(); + assertTrue("Expected row: row-6", Bytes.equals(r6, result.getRow())); + result = resultScanner.next(); + assertNull(result); + } finally { + if (resultScanner != null) { + resultScanner.close(); + } + if (table3 != null) { + table3.close(); + } + inCP = false; + } + + // RegionScanner is closed and there is no rpcCallBack set + assertTrue(prevScannerCount == region.scannerReadPoints.size()); + serverCall = (ServerCall) RpcServer.getCurrentCall().get(); + assertTrue(serverCall.getCallBack() == null); + return regionScanner; + } catch (Throwable e) { + exceptionRef.set(e); + throw e; + } + } + } + +}