HBASE-26812 ShortCircuitingClusterConnection fails to close RegionScanners when making short-circuited calls (#4302)

This commit is contained in:
chenglei 2022-03-31 20:56:41 +08:00 committed by GitHub
parent 780cea5c02
commit d8a5bb827b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 633 additions and 102 deletions

View File

@ -48,14 +48,10 @@ import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.security.UserProvider;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterService;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ReflectionUtils;
import org.apache.hadoop.ipc.RemoteException;
@ -129,68 +125,6 @@ public final class ConnectionUtils {
log.info(sn + " server-side Connection retries=" + retries);
}
/**
* A ClusterConnection that will short-circuit RPC making direct invocations against the localhost
* if the invocation target is 'this' server; save on network and protobuf invocations.
*/
// TODO This has to still do PB marshalling/unmarshalling stuff. Check how/whether we can avoid.
// Class is visible so can assert we are short-circuiting when expected.
public static class ShortCircuitingClusterConnection extends ConnectionImplementation {
private final ServerName serverName;
private final AdminService.BlockingInterface localHostAdmin;
private final ClientService.BlockingInterface localHostClient;
private ShortCircuitingClusterConnection(Configuration conf, ExecutorService pool, User user,
ServerName serverName, AdminService.BlockingInterface admin,
ClientService.BlockingInterface client) throws IOException {
super(conf, pool, user);
this.serverName = serverName;
this.localHostAdmin = admin;
this.localHostClient = client;
}
@Override
public AdminService.BlockingInterface getAdmin(ServerName sn) throws IOException {
return serverName.equals(sn) ? this.localHostAdmin : super.getAdmin(sn);
}
@Override
public ClientService.BlockingInterface getClient(ServerName sn) throws IOException {
return serverName.equals(sn) ? this.localHostClient : super.getClient(sn);
}
@Override
public MasterKeepAliveConnection getMaster() throws IOException {
if (this.localHostClient instanceof MasterService.BlockingInterface) {
return new ShortCircuitMasterConnection(
(MasterService.BlockingInterface) this.localHostClient);
}
return super.getMaster();
}
}
/**
* Creates a short-circuit connection that can bypass the RPC layer (serialization,
* deserialization, networking, etc..) when talking to a local server.
* @param conf the current configuration
* @param pool the thread pool to use for batch operations
* @param user the user the connection is for
* @param serverName the local server name
* @param admin the admin interface of the local server
* @param client the client interface of the local server
* @return an short-circuit connection.
* @throws IOException if IO failure occurred
*/
public static ClusterConnection createShortCircuitConnection(final Configuration conf,
ExecutorService pool, User user, final ServerName serverName,
final AdminService.BlockingInterface admin, final ClientService.BlockingInterface client)
throws IOException {
if (user == null) {
user = UserProvider.instantiate(conf).getCurrent();
}
return new ShortCircuitingClusterConnection(conf, pool, user, serverName, admin, client);
}
/**
* Setup the connection class, so that it will not depend on master being online. Used for testing
* @param conf configuration to set

View File

@ -0,0 +1,229 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;
import java.io.IOException;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.coprocessor.RegionObserver;
import org.apache.hadoop.hbase.ipc.RpcCall;
import org.apache.hadoop.hbase.ipc.RpcServer;
import org.apache.hadoop.hbase.ipc.ServerCall;
import org.apache.hadoop.hbase.regionserver.RSRpcServices;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.security.UserProvider;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CleanupBulkLoadRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CleanupBulkLoadResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.PrepareBulkLoadRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.PrepareBulkLoadResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterService;
@InterfaceAudience.Private
public final class ServerConnectionUtils {
private ServerConnectionUtils() {
}
/**
* A ClusterConnection that will short-circuit RPC making direct invocations against the localhost
* if the invocation target is 'this' server; save on network and protobuf invocations.
*/
// TODO This has to still do PB marshalling/unmarshalling stuff. Check how/whether we can avoid.
// Class is visible so can assert we are short-circuiting when expected.
public final static class ShortCircuitingClusterConnection extends ConnectionImplementation {
private final ServerName serverName;
private final AdminService.BlockingInterface localHostAdmin;
private final ClientService.BlockingInterface localHostClient;
private final ClientService.BlockingInterface localClientServiceBlockingInterfaceWrapper;
private ShortCircuitingClusterConnection(Configuration conf, ExecutorService pool, User user,
ServerName serverName, AdminService.BlockingInterface admin,
ClientService.BlockingInterface client) throws IOException {
super(conf, pool, user);
this.serverName = serverName;
this.localHostAdmin = admin;
this.localHostClient = client;
this.localClientServiceBlockingInterfaceWrapper =
new ClientServiceBlockingInterfaceWrapper(this.localHostClient);
}
@Override
public AdminService.BlockingInterface getAdmin(ServerName sn) throws IOException {
return serverName.equals(sn) ? this.localHostAdmin : super.getAdmin(sn);
}
@Override
public ClientService.BlockingInterface getClient(ServerName sn) throws IOException {
return serverName.equals(sn) ? this.localClientServiceBlockingInterfaceWrapper
: super.getClient(sn);
}
@Override
public MasterKeepAliveConnection getMaster() throws IOException {
if (this.localHostClient instanceof MasterService.BlockingInterface) {
return new ShortCircuitMasterConnection(
(MasterService.BlockingInterface) this.localHostClient);
}
return super.getMaster();
}
/**
* When we directly invoke {@link RSRpcServices#get} on the same RegionServer through
* {@link ShortCircuitingClusterConnection} in region CPs such as
* {@link RegionObserver#postScannerOpen} to get other rows, the {@link RegionScanner} created
* for the directly {@link RSRpcServices#get} may not be closed until the outmost rpc call is
* completed if there is an outmost {@link RpcCall}, and even worse , the
* {@link ServerCall#rpcCallback} may be override which would cause serious problem,so for
* {@link ShortCircuitingClusterConnection#getClient}, if return
* {@link ShortCircuitingClusterConnection#localHostClient},we would add a wrapper class to wrap
* it , which using {@link RpcServer#unsetCurrentCall} and {RpcServer#setCurrentCall} to
* surround the scan and get method call,so the {@link RegionScanner} created for the directly
* {@link RSRpcServices#get} could be closed immediately,see HBASE-26812 for more.
*/
static class ClientServiceBlockingInterfaceWrapper
implements ClientService.BlockingInterface {
private ClientService.BlockingInterface target;
ClientServiceBlockingInterfaceWrapper(ClientService.BlockingInterface target) {
this.target = target;
}
@Override
public GetResponse get(RpcController controller, GetRequest request) throws ServiceException {
return this.doCall(controller, request, (c, r) -> {
return target.get(c, r);
});
}
@Override
public MultiResponse multi(RpcController controller, MultiRequest request)
throws ServiceException {
/**
* Here is for multiGet
*/
return this.doCall(controller, request, (c, r) -> {
return target.multi(c, r);
});
}
@Override
public ScanResponse scan(RpcController controller, ScanRequest request)
throws ServiceException {
return this.doCall(controller, request, (c, r) -> {
return target.scan(c, r);
});
}
interface Operation<REQUEST, RESPONSE> {
RESPONSE call(RpcController controller, REQUEST request) throws ServiceException;
}
private <REQUEST, RESPONSE> RESPONSE doCall(RpcController controller, REQUEST request,
Operation<REQUEST, RESPONSE> operation) throws ServiceException {
Optional<RpcCall> rpcCallOptional = RpcServer.unsetCurrentCall();
try {
return operation.call(controller, request);
} finally {
rpcCallOptional.ifPresent(RpcServer::setCurrentCall);
}
}
@Override
public MutateResponse mutate(RpcController controller, MutateRequest request)
throws ServiceException {
return target.mutate(controller, request);
}
@Override
public BulkLoadHFileResponse bulkLoadHFile(RpcController controller,
BulkLoadHFileRequest request) throws ServiceException {
return target.bulkLoadHFile(controller, request);
}
@Override
public PrepareBulkLoadResponse prepareBulkLoad(RpcController controller,
PrepareBulkLoadRequest request) throws ServiceException {
return target.prepareBulkLoad(controller, request);
}
@Override
public CleanupBulkLoadResponse cleanupBulkLoad(RpcController controller,
CleanupBulkLoadRequest request) throws ServiceException {
return target.cleanupBulkLoad(controller, request);
}
@Override
public CoprocessorServiceResponse execService(RpcController controller,
CoprocessorServiceRequest request) throws ServiceException {
return target.execService(controller, request);
}
@Override
public CoprocessorServiceResponse execRegionServerService(RpcController controller,
CoprocessorServiceRequest request) throws ServiceException {
return target.execRegionServerService(controller, request);
}
}
}
/**
* Creates a short-circuit connection that can bypass the RPC layer (serialization,
* deserialization, networking, etc..) when talking to a local server.
* @param conf the current configuration
* @param user the user the connection is for
* @param serverName the local server name
* @param admin the admin interface of the local server
* @param client the client interface of the local server
* @param registry the connection registry to be used, can be null
* @return an short-circuit connection.
* @throws IOException if IO failure occurred
*/
public static ClusterConnection createShortCircuitConnection(final Configuration conf,
ExecutorService pool, User user, final ServerName serverName,
final AdminService.BlockingInterface admin, final ClientService.BlockingInterface client)
throws IOException {
if (user == null) {
user = UserProvider.instantiate(conf).getCurrent();
}
return new ShortCircuitingClusterConnection(conf, pool, user, serverName, admin, client);
}
}

View File

@ -21,7 +21,6 @@ package org.apache.hadoop.hbase.coprocessor;
import java.io.IOException;
import java.util.concurrent.ConcurrentMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
@ -32,6 +31,7 @@ import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.metrics.MetricRegistry;
import org.apache.hadoop.hbase.regionserver.OnlineRegions;
import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.security.User;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability;
@ -58,51 +58,53 @@ public interface RegionCoprocessorEnvironment extends CoprocessorEnvironment<Reg
ServerName getServerName();
/**
* Returns the hosts' Connection to the Cluster. <b>Do not close! This is a shared connection
* with the hosting server. Throws {@link UnsupportedOperationException} if you try to close
* or abort it</b>.
*
* For light-weight usage only. Heavy-duty usage will pull down
* the hosting RegionServer responsiveness as well as that of other Coprocessors making use of
* this Connection. Use to create table on start or to do administrative operations. Coprocessors
* should create their own Connections if heavy usage to avoid impinging on hosting Server
* operation. To create a Connection or if a Coprocessor requires a region with a particular
* Configuration, use {@link org.apache.hadoop.hbase.client.ConnectionFactory} or
* Returns the hosts' Connection to the Cluster. <b>Do not close! This is a shared connection with
* the hosting server. Throws {@link UnsupportedOperationException} if you try to close or abort
* it</b>. For light-weight usage only. Heavy-duty usage will pull down the hosting RegionServer
* responsiveness as well as that of other Coprocessors making use of this Connection. Use to
* create table on start or to do administrative operations. Coprocessors should create their own
* Connections if heavy usage to avoid impinging on hosting Server operation. To create a
* Connection or if a Coprocessor requires a region with a particular Configuration, use
* {@link org.apache.hadoop.hbase.client.ConnectionFactory} or
* {@link #createConnection(Configuration)}}.
*
* <p>Be aware that operations that make use of this Connection are executed as the RegionServer
* <p>
* Be aware that operations that make use of this Connection are executed as the RegionServer
* User, the hbase super user that started this server process. Exercise caution running
* operations as this User (See {@link #createConnection(Configuration)}} to run as other than
* the RegionServer User).
*
* <p>Be careful RPC'ing from a Coprocessor context. RPC's will fail, stall, retry, and/or crawl
* operations as this User (See {@link #createConnection(Configuration)}} to run as other than the
* RegionServer User).
* <p>
* Be careful RPC'ing from a Coprocessor context. RPC's will fail, stall, retry, and/or crawl
* because the remote side is not online, is struggling or it is on the other side of a network
* partition. Any use of Connection from inside a Coprocessor must be able to handle all such
* hiccups.
*
* <p>
* NOTE: the returned Connection is created using {@link User#getCurrent},so the Connection is as
* the System User who running the RegionServer, not the client User who initiate the RPC.
* @see #createConnection(Configuration)
* @return The host's Connection to the Cluster.
*/
Connection getConnection();
/**
* Creates a cluster connection using the passed Configuration.
*
* Creating a Connection is a heavy-weight operation. The resultant Connection's cache of
* region locations will be empty. Therefore you should cache and reuse Connections rather than
* create a Connection on demand. Create on start of your Coprocessor. You will have to cast
* the CoprocessorEnvironment appropriately to get at this API at start time because
* Coprocessor start method is passed a subclass of this CoprocessorEnvironment or fetch
* Connection using a synchronized accessor initializing the Connection on first access. Close
* the returned Connection when done to free resources. Using this API rather
* than {@link org.apache.hadoop.hbase.client.ConnectionFactory#createConnection(Configuration)}
* Creates a cluster connection using the passed Configuration. Creating a Connection is a
* heavy-weight operation. The resultant Connection's cache of region locations will be empty.
* Therefore you should cache and reuse Connections rather than create a Connection on demand.
* Create on start of your Coprocessor. You will have to cast the CoprocessorEnvironment
* appropriately to get at this API at start time because Coprocessor start method is passed a
* subclass of this CoprocessorEnvironment or fetch Connection using a synchronized accessor
* initializing the Connection on first access. Close the returned Connection when done to free
* resources. Using this API rather than
* {@link org.apache.hadoop.hbase.client.ConnectionFactory#createConnection(Configuration)}
* returns a Connection that will short-circuit RPC if the target is a local resource. Use
* ConnectionFactory if you don't need this ability.
*
* <p>Be careful RPC'ing from a Coprocessor context. RPC's will fail, stall, retry, and/or crawl
* <p>
* Be careful RPC'ing from a Coprocessor context. RPC's will fail, stall, retry, and/or crawl
* because the remote side is not online, is struggling or it is on the other side of a network
* partition. Any use of Connection from inside a Coprocessor must be able to handle all such
* hiccups.
* <p>
* NOTE: the returned Connection is created using {@link User#getCurrent},so the Connection is as
* the System User who running the RegionServer, not the client User who initiate the RPC.
* @return Connection created using the passed conf.
*/
Connection createConnection(Configuration conf) throws IOException;

View File

@ -695,6 +695,7 @@ public abstract class RpcServer implements RpcServerInterface,
return rpcCall;
}
/**
* Used by {@link org.apache.hadoop.hbase.procedure2.store.region.RegionProcedureStore}. Set the
* rpc call back after mutate region.

View File

@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hbase.ipc;
import com.google.errorprone.annotations.RestrictedApi;
import java.io.IOException;
import java.net.InetAddress;
import java.nio.ByteBuffer;
@ -559,4 +560,10 @@ public abstract class ServerCall<T extends ServerRpcConnection> implements RpcCa
return response;
}
}
@RestrictedApi(explanation = "Should only be called in tests", link = "",
allowedOnPath = ".*/src/test/.*")
public synchronized RpcCallback getCallBack() {
return this.rpcCallback;
}
}

View File

@ -411,7 +411,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
static final long DEFAULT_ROW_PROCESSOR_TIMEOUT = 60 * 1000L;
final ExecutorService rowProcessorExecutor = Executors.newCachedThreadPool();
private final ConcurrentHashMap<RegionScanner, Long> scannerReadPoints;
final ConcurrentHashMap<RegionScanner, Long> scannerReadPoints;
/**
* The sequence ID that was enLongAddered when this region was opened.

View File

@ -92,6 +92,7 @@ import org.apache.hadoop.hbase.client.ConnectionUtils;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory;
import org.apache.hadoop.hbase.client.ServerConnectionUtils;
import org.apache.hadoop.hbase.client.locking.EntityLock;
import org.apache.hadoop.hbase.client.locking.LockServiceClient;
import org.apache.hadoop.hbase.conf.ConfigurationManager;
@ -863,7 +864,7 @@ public class HRegionServer extends Thread implements
// Create a cluster connection that when appropriate, can short-circuit and go directly to the
// local server if the request is to the local server bypassing RPC. Can be used for both local
// and remote invocations.
return ConnectionUtils.createShortCircuitConnection(conf, null, userProvider.getCurrent(),
return ServerConnectionUtils.createShortCircuitConnection(conf, null, userProvider.getCurrent(),
serverName, rpcServices, rpcServices);
}
@ -3846,7 +3847,7 @@ public class HRegionServer extends Thread implements
@Override
public Connection createConnection(Configuration conf) throws IOException {
User user = UserProvider.instantiate(conf).getCurrent();
return ConnectionUtils.createShortCircuitConnection(conf, null, user, this.serverName,
return ServerConnectionUtils.createShortCircuitConnection(conf, null, user, this.serverName,
this.rpcServices, this.rpcServices);
}

View File

@ -83,7 +83,9 @@ public class TestShortCircuitConnection {
AdminService.BlockingInterface admin = connection.getAdmin(regionServer.getServerName());
ClientService.BlockingInterface client = connection.getClient(regionServer.getServerName());
assertTrue(admin instanceof RSRpcServices);
assertTrue(client instanceof RSRpcServices);
assertTrue(
client instanceof ServerConnectionUtils.ShortCircuitingClusterConnection
.ClientServiceBlockingInterfaceWrapper);
ServerName anotherSn = ServerName.valueOf(regionServer.getServerName().getAddress(),
EnvironmentEdgeManager.currentTime());
admin = connection.getAdmin(anotherSn);

View File

@ -26,7 +26,7 @@ import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.SharedConnection;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionUtils;
import org.apache.hadoop.hbase.client.ServerConnectionUtils;
import org.apache.hadoop.hbase.testclassification.CoprocessorTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.junit.AfterClass;
@ -119,7 +119,7 @@ public class TestCoprocessorShortCircuitRPC {
}
private static void checkShortCircuit(Connection connection) {
assertTrue(connection instanceof ConnectionUtils.ShortCircuitingClusterConnection);
assertTrue(connection instanceof ServerConnectionUtils.ShortCircuitingClusterConnection);
}
@Test

View File

@ -0,0 +1,355 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.util.Arrays;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.MiniHBaseCluster.MiniHBaseClusterRegionServer;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.coprocessor.RegionObserver;
import org.apache.hadoop.hbase.ipc.RpcServer;
import org.apache.hadoop.hbase.ipc.ServerCall;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse;
@Category({ RegionServerTests.class, MediumTests.class })
public class TestShortCircuitGet {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestShortCircuitGet.class);
private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
private static final byte[] FAMILY = Bytes.toBytes("testFamily");
private static final byte[] QUALIFIER = Bytes.toBytes("testQualifier");
private static final byte[] VALUE = Bytes.toBytes("testValue");
static final byte[] r0 = Bytes.toBytes("row-0");
static final byte[] r1 = Bytes.toBytes("row-1");
static final byte[] r2 = Bytes.toBytes("row-2");
static final byte[] r3 = Bytes.toBytes("row-3");
static final byte[] r4 = Bytes.toBytes("row-4");
static final byte[] r5 = Bytes.toBytes("row-5");
static final byte[] r6 = Bytes.toBytes("row-6");
static final TableName tableName = TableName.valueOf("TestShortCircuitGet");
@Rule
public TestName name = new TestName();
@BeforeClass
public static void setUpBeforeClass() throws Exception {
Configuration conf = TEST_UTIL.getConfiguration();
conf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 30 * 60 * 1000);
conf.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, 30 * 60 * 1000);
conf.setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 60 * 60 * 1000);
conf.setStrings(HConstants.REGION_SERVER_IMPL, MyRegionServer.class.getName());
conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1);
conf.setInt(HConstants.HBASE_CLIENT_PAUSE, 10000);
TEST_UTIL.startMiniCluster(1);
}
@AfterClass
public static void tearDownAfterClass() throws Exception {
TEST_UTIL.shutdownMiniCluster();
}
/**
* This test is for HBASE-26821,when we initiate get or scan in cp, the {@link RegionScanner} for
* get and scan is close when get or scan is completed.
*/
@Test
public void testScannerCloseWhenScanAndGetInCP() throws Exception {
Table table = null;
try {
table = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY }, 1,
HConstants.DEFAULT_BLOCKSIZE, MyScanObserver.class.getName());
putToTable(table, r0);
putToTable(table, r1);
putToTable(table, r2);
putToTable(table, r3);
putToTable(table, r4);
putToTable(table, r5);
putToTable(table, r6);
} finally {
if (table != null) {
table.close();
}
}
final Configuration conf = TEST_UTIL.getConfiguration();
ResultScanner resultScanner = null;
Connection conn = null;
Table clientTable = null;
try {
conn = ConnectionFactory.createConnection(conf);
clientTable = conn.getTable(tableName);
Scan scan = new Scan();
scan.setCaching(1);
scan.withStartRow(r0, true).withStopRow(r1, true);
resultScanner = table.getScanner(scan);
Result result = resultScanner.next();
assertTrue("Expected row: row-0", Bytes.equals(r0, result.getRow()));
result = resultScanner.next();
assertTrue("Expected row: row-1", Bytes.equals(r1, result.getRow()));
assertNull(resultScanner.next());
} finally {
if (resultScanner != null) {
resultScanner.close();
}
if (clientTable != null) {
clientTable.close();
}
if (conn != null) {
conn.close();
}
}
assertTrue(MyRSRpcServices.exceptionRef.get() == null);
assertTrue(MyScanObserver.exceptionRef.get() == null);
}
private void putToTable(Table ht, byte[] rowkey) throws IOException {
Put put = new Put(rowkey);
put.addColumn(FAMILY, QUALIFIER, VALUE);
ht.put(put);
}
private static class MyRegionServer extends MiniHBaseClusterRegionServer {
public MyRegionServer(Configuration conf)
throws IOException, InterruptedException {
super(conf);
}
@Override
protected RSRpcServices createRpcServices() throws IOException {
return new MyRSRpcServices(this);
}
}
private static class MyRSRpcServices extends RSRpcServices {
private static AtomicReference<Throwable> exceptionRef = new AtomicReference<Throwable>(null);
public MyRSRpcServices(HRegionServer rs) throws IOException {
super(rs);
}
@Override
public MultiResponse multi(RpcController rpcc, MultiRequest request) throws ServiceException {
try {
if (!MyScanObserver.inCP) {
return super.multi(rpcc, request);
}
assertTrue(!RpcServer.getCurrentCall().isPresent());
return super.multi(rpcc, request);
} catch (Throwable e) {
exceptionRef.set(e);
throw new ServiceException(e);
}
}
@Override
public ScanResponse scan(RpcController controller, ScanRequest request)
throws ServiceException {
try {
if (!MyScanObserver.inCP) {
return super.scan(controller, request);
}
HRegion region = null;
if (request.hasRegion()) {
region = this.getRegion(request.getRegion());
}
if (region != null
&& TableName.isMetaTableName(region.getTableDescriptor().getTableName())) {
return super.scan(controller, request);
}
assertTrue(!RpcServer.getCurrentCall().isPresent());
return super.scan(controller, request);
} catch (Throwable e) {
exceptionRef.set(e);
throw new ServiceException(e);
}
}
@Override
public GetResponse get(RpcController controller, GetRequest request) throws ServiceException {
try {
if (!MyScanObserver.inCP) {
return super.get(controller, request);
}
HRegion region = null;
if (request.hasRegion()) {
region = this.getRegion(request.getRegion());
}
if (region != null
&& TableName.isMetaTableName(region.getTableDescriptor().getTableName())) {
return super.get(controller, request);
}
assertTrue(!RpcServer.getCurrentCall().isPresent());
return super.get(controller, request);
} catch (Throwable e) {
exceptionRef.set(e);
throw new ServiceException(e);
}
}
}
public static class MyScanObserver implements RegionCoprocessor, RegionObserver {
private static volatile boolean inCP = false;
private static AtomicReference<Throwable> exceptionRef = new AtomicReference<Throwable>(null);
@Override
public Optional<RegionObserver> getRegionObserver() {
return Optional.of(this);
}
@SuppressWarnings("rawtypes")
@Override
public RegionScanner postScannerOpen(
final ObserverContext<RegionCoprocessorEnvironment> observerContext, final Scan scan,
final RegionScanner regionScanner) throws IOException {
if (inCP) {
return regionScanner;
}
HRegion region = (HRegion) observerContext.getEnvironment().getRegion();
int prevScannerCount = region.scannerReadPoints.size();
Table table1 = null;
Get get2 = new Get(r2);
inCP = true;
try (Connection connection = observerContext.getEnvironment()
.createConnection(observerContext.getEnvironment().getConfiguration())) {
try {
table1 = connection.getTable(tableName);
Result result = table1.get(get2);
assertTrue("Expected row: row-2", Bytes.equals(r2, result.getRow()));
} finally {
if (table1 != null) {
table1.close();
}
inCP = false;
}
// RegionScanner is closed and there is no rpcCallBack set
assertTrue(prevScannerCount == region.scannerReadPoints.size());
ServerCall serverCall = (ServerCall) RpcServer.getCurrentCall().get();
assertTrue(serverCall.getCallBack() == null);
Get get3 = new Get(r3);
Get get4 = new Get(r4);
Table table2 = null;
inCP = true;
try {
table2 = connection.getTable(tableName);
Result[] results = table2.get(Arrays.asList(get3, get4));
assertTrue("Expected row: row-3", Bytes.equals(r3, results[0].getRow()));
assertTrue("Expected row: row-4", Bytes.equals(r4, results[1].getRow()));
} finally {
if (table2 != null) {
table2.close();
}
inCP = false;
}
// RegionScanner is closed and there is no rpcCallBack set
assertTrue(prevScannerCount == region.scannerReadPoints.size());
serverCall = (ServerCall) RpcServer.getCurrentCall().get();
assertTrue(serverCall.getCallBack() == null);
Scan newScan = new Scan();
newScan.setCaching(1);
newScan.withStartRow(r5, true).withStopRow(r6, true);
Table table3 = null;
ResultScanner resultScanner = null;
inCP = true;
try {
table3 = connection.getTable(tableName);
resultScanner = table3.getScanner(newScan);
Result result = resultScanner.next();
assertTrue("Expected row: row-5", Bytes.equals(r5, result.getRow()));
result = resultScanner.next();
assertTrue("Expected row: row-6", Bytes.equals(r6, result.getRow()));
result = resultScanner.next();
assertNull(result);
} finally {
if (resultScanner != null) {
resultScanner.close();
}
if (table3 != null) {
table3.close();
}
inCP = false;
}
// RegionScanner is closed and there is no rpcCallBack set
assertTrue(prevScannerCount == region.scannerReadPoints.size());
serverCall = (ServerCall) RpcServer.getCurrentCall().get();
assertTrue(serverCall.getCallBack() == null);
return regionScanner;
} catch (Throwable e) {
exceptionRef.set(e);
throw e;
}
}
}
}