HBASE-13480 ShortCircuitConnection doesn't short-circuit all calls as expected (Jingcheng Du and Josh Elser)
This commit is contained in:
parent
9334a47d45
commit
b85857cf3c
|
@ -1,467 +0,0 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.client;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.HRegionLocation;
|
||||
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||
import org.apache.hadoop.hbase.MasterNotRunningException;
|
||||
import org.apache.hadoop.hbase.RegionLocations;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicy;
|
||||
import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.MasterService;
|
||||
|
||||
/**
|
||||
* An internal class that delegates to an {@link HConnection} instance.
|
||||
* A convenience to override when customizing method implementations.
|
||||
*
|
||||
*
|
||||
* @see ConnectionUtils#createShortCircuitHConnection(Connection, ServerName,
|
||||
* AdminService.BlockingInterface, ClientService.BlockingInterface) for case where we make
|
||||
* Connections skip RPC if request is to local server.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
@SuppressWarnings("deprecation")
|
||||
//NOTE: DO NOT make this class public. It was made package-private on purpose.
|
||||
abstract class ConnectionAdapter implements ClusterConnection {
|
||||
|
||||
private final ClusterConnection wrappedConnection;
|
||||
|
||||
public ConnectionAdapter(Connection c) {
|
||||
wrappedConnection = (ClusterConnection)c;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void abort(String why, Throwable e) {
|
||||
wrappedConnection.abort(why, e);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isAborted() {
|
||||
return wrappedConnection.isAborted();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
wrappedConnection.close();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Configuration getConfiguration() {
|
||||
return wrappedConnection.getConfiguration();
|
||||
}
|
||||
|
||||
@Override
|
||||
public HTableInterface getTable(String tableName) throws IOException {
|
||||
return wrappedConnection.getTable(tableName);
|
||||
}
|
||||
|
||||
@Override
|
||||
public HTableInterface getTable(byte[] tableName) throws IOException {
|
||||
return wrappedConnection.getTable(tableName);
|
||||
}
|
||||
|
||||
@Override
|
||||
public HTableInterface getTable(TableName tableName) throws IOException {
|
||||
return wrappedConnection.getTable(tableName);
|
||||
}
|
||||
|
||||
@Override
|
||||
public HTableInterface getTable(String tableName, ExecutorService pool)
|
||||
throws IOException {
|
||||
return wrappedConnection.getTable(tableName, pool);
|
||||
}
|
||||
|
||||
@Override
|
||||
public HTableInterface getTable(byte[] tableName, ExecutorService pool)
|
||||
throws IOException {
|
||||
return wrappedConnection.getTable(tableName, pool);
|
||||
}
|
||||
|
||||
@Override
|
||||
public HTableInterface getTable(TableName tableName, ExecutorService pool)
|
||||
throws IOException {
|
||||
return wrappedConnection.getTable(tableName, pool);
|
||||
}
|
||||
|
||||
@Override
|
||||
public BufferedMutator getBufferedMutator(BufferedMutatorParams params)
|
||||
throws IOException {
|
||||
return wrappedConnection.getBufferedMutator(params);
|
||||
}
|
||||
|
||||
@Override
|
||||
public BufferedMutator getBufferedMutator(TableName tableName) throws IOException {
|
||||
return wrappedConnection.getBufferedMutator(tableName);
|
||||
}
|
||||
|
||||
@Override
|
||||
public RegionLocator getRegionLocator(TableName tableName) throws IOException {
|
||||
return wrappedConnection.getRegionLocator(tableName);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Admin getAdmin() throws IOException {
|
||||
return wrappedConnection.getAdmin();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isMasterRunning() throws MasterNotRunningException,
|
||||
ZooKeeperConnectionException {
|
||||
return wrappedConnection.isMasterRunning();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isTableEnabled(TableName tableName) throws IOException {
|
||||
return wrappedConnection.isTableEnabled(tableName);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isTableEnabled(byte[] tableName) throws IOException {
|
||||
return wrappedConnection.isTableEnabled(tableName);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isTableDisabled(TableName tableName) throws IOException {
|
||||
return wrappedConnection.isTableDisabled(tableName);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isTableDisabled(byte[] tableName) throws IOException {
|
||||
return wrappedConnection.isTableDisabled(tableName);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isTableAvailable(TableName tableName) throws IOException {
|
||||
return wrappedConnection.isTableAvailable(tableName);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isTableAvailable(byte[] tableName) throws IOException {
|
||||
return wrappedConnection.isTableAvailable(tableName);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isTableAvailable(TableName tableName, byte[][] splitKeys)
|
||||
throws IOException {
|
||||
return wrappedConnection.isTableAvailable(tableName, splitKeys);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isTableAvailable(byte[] tableName, byte[][] splitKeys)
|
||||
throws IOException {
|
||||
return wrappedConnection.isTableAvailable(tableName, splitKeys);
|
||||
}
|
||||
|
||||
@Override
|
||||
public TableState getTableState(TableName tableName) throws IOException {
|
||||
return wrappedConnection.getTableState(tableName);
|
||||
}
|
||||
|
||||
@Override
|
||||
public HTableDescriptor[] listTables() throws IOException {
|
||||
return wrappedConnection.listTables();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String[] getTableNames() throws IOException {
|
||||
return wrappedConnection.getTableNames();
|
||||
}
|
||||
|
||||
@Override
|
||||
public TableName[] listTableNames() throws IOException {
|
||||
return wrappedConnection.listTableNames();
|
||||
}
|
||||
|
||||
@Override
|
||||
public HTableDescriptor getHTableDescriptor(TableName tableName)
|
||||
throws IOException {
|
||||
return wrappedConnection.getHTableDescriptor(tableName);
|
||||
}
|
||||
|
||||
@Override
|
||||
public HTableDescriptor getHTableDescriptor(byte[] tableName)
|
||||
throws IOException {
|
||||
return wrappedConnection.getHTableDescriptor(tableName);
|
||||
}
|
||||
|
||||
@Override
|
||||
public HRegionLocation locateRegion(TableName tableName, byte[] row)
|
||||
throws IOException {
|
||||
return wrappedConnection.locateRegion(tableName, row);
|
||||
}
|
||||
|
||||
@Override
|
||||
public HRegionLocation locateRegion(byte[] tableName, byte[] row)
|
||||
throws IOException {
|
||||
return wrappedConnection.locateRegion(tableName, row);
|
||||
}
|
||||
|
||||
@Override
|
||||
public RegionLocations locateRegion(TableName tableName, byte[] row, boolean useCache,
|
||||
boolean retry) throws IOException {
|
||||
return wrappedConnection.locateRegion(tableName, row, useCache, retry);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void clearRegionCache() {
|
||||
wrappedConnection.clearRegionCache();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void clearRegionCache(TableName tableName) {
|
||||
wrappedConnection.clearRegionCache(tableName);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void clearRegionCache(byte[] tableName) {
|
||||
wrappedConnection.clearRegionCache(tableName);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void deleteCachedRegionLocation(HRegionLocation location) {
|
||||
wrappedConnection.deleteCachedRegionLocation(location);
|
||||
}
|
||||
|
||||
@Override
|
||||
public HRegionLocation relocateRegion(TableName tableName, byte[] row)
|
||||
throws IOException {
|
||||
return wrappedConnection.relocateRegion(tableName, row);
|
||||
}
|
||||
|
||||
@Override
|
||||
public HRegionLocation relocateRegion(byte[] tableName, byte[] row)
|
||||
throws IOException {
|
||||
return wrappedConnection.relocateRegion(tableName, row);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void updateCachedLocations(TableName tableName, byte[] rowkey,
|
||||
Object exception, HRegionLocation source) {
|
||||
wrappedConnection.updateCachedLocations(tableName, rowkey, exception, source);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void updateCachedLocations(TableName tableName, byte[] regionName, byte[] rowkey,
|
||||
Object exception, ServerName source) {
|
||||
wrappedConnection.updateCachedLocations(tableName, regionName, rowkey, exception, source);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void updateCachedLocations(byte[] tableName, byte[] rowkey,
|
||||
Object exception, HRegionLocation source) {
|
||||
wrappedConnection.updateCachedLocations(tableName, rowkey, exception, source);
|
||||
}
|
||||
|
||||
@Override
|
||||
public HRegionLocation locateRegion(byte[] regionName) throws IOException {
|
||||
return wrappedConnection.locateRegion(regionName);
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<HRegionLocation> locateRegions(TableName tableName)
|
||||
throws IOException {
|
||||
return wrappedConnection.locateRegions(tableName);
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<HRegionLocation> locateRegions(byte[] tableName)
|
||||
throws IOException {
|
||||
return wrappedConnection.locateRegions(tableName);
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<HRegionLocation> locateRegions(TableName tableName,
|
||||
boolean useCache, boolean offlined) throws IOException {
|
||||
return wrappedConnection.locateRegions(tableName, useCache, offlined);
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<HRegionLocation> locateRegions(byte[] tableName,
|
||||
boolean useCache, boolean offlined) throws IOException {
|
||||
return wrappedConnection.locateRegions(tableName, useCache, offlined);
|
||||
}
|
||||
|
||||
@Override
|
||||
public RegionLocations locateRegion(TableName tableName, byte[] row, boolean useCache,
|
||||
boolean retry, int replicaId) throws IOException {
|
||||
return wrappedConnection.locateRegion(tableName, row, useCache, retry, replicaId);
|
||||
}
|
||||
|
||||
@Override
|
||||
public RegionLocations relocateRegion(TableName tableName, byte[] row, int replicaId)
|
||||
throws IOException {
|
||||
return wrappedConnection.relocateRegion(tableName, row, replicaId);
|
||||
}
|
||||
|
||||
@Override
|
||||
public MasterService.BlockingInterface getMaster() throws IOException {
|
||||
return wrappedConnection.getMaster();
|
||||
}
|
||||
|
||||
@Override
|
||||
public AdminService.BlockingInterface getAdmin(
|
||||
ServerName serverName) throws IOException {
|
||||
return wrappedConnection.getAdmin(serverName);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ClientService.BlockingInterface getClient(
|
||||
ServerName serverName) throws IOException {
|
||||
return wrappedConnection.getClient(serverName);
|
||||
}
|
||||
|
||||
@Override
|
||||
public AdminService.BlockingInterface getAdmin(
|
||||
ServerName serverName, boolean getMaster) throws IOException {
|
||||
return wrappedConnection.getAdmin(serverName, getMaster);
|
||||
}
|
||||
|
||||
@Override
|
||||
public HRegionLocation getRegionLocation(TableName tableName, byte[] row,
|
||||
boolean reload) throws IOException {
|
||||
return wrappedConnection.getRegionLocation(tableName, row, reload);
|
||||
}
|
||||
|
||||
@Override
|
||||
public HRegionLocation getRegionLocation(byte[] tableName, byte[] row,
|
||||
boolean reload) throws IOException {
|
||||
return wrappedConnection.getRegionLocation(tableName, row, reload);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void processBatch(List<? extends Row> actions, TableName tableName,
|
||||
ExecutorService pool, Object[] results) throws IOException,
|
||||
InterruptedException {
|
||||
wrappedConnection.processBatch(actions, tableName, pool, results);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void processBatch(List<? extends Row> actions, byte[] tableName,
|
||||
ExecutorService pool, Object[] results) throws IOException,
|
||||
InterruptedException {
|
||||
wrappedConnection.processBatch(actions, tableName, pool, results);
|
||||
}
|
||||
|
||||
@Override
|
||||
public <R> void processBatchCallback(List<? extends Row> list,
|
||||
TableName tableName, ExecutorService pool, Object[] results,
|
||||
Callback<R> callback) throws IOException, InterruptedException {
|
||||
wrappedConnection.processBatchCallback(list, tableName, pool, results, callback);
|
||||
}
|
||||
|
||||
@Override
|
||||
public <R> void processBatchCallback(List<? extends Row> list,
|
||||
byte[] tableName, ExecutorService pool, Object[] results,
|
||||
Callback<R> callback) throws IOException, InterruptedException {
|
||||
wrappedConnection.processBatchCallback(list, tableName, pool, results, callback);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setRegionCachePrefetch(TableName tableName, boolean enable) {
|
||||
wrappedConnection.setRegionCachePrefetch(tableName, enable);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setRegionCachePrefetch(byte[] tableName, boolean enable) {
|
||||
wrappedConnection.setRegionCachePrefetch(tableName, enable);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean getRegionCachePrefetch(TableName tableName) {
|
||||
return wrappedConnection.getRegionCachePrefetch(tableName);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean getRegionCachePrefetch(byte[] tableName) {
|
||||
return wrappedConnection.getRegionCachePrefetch(tableName);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getCurrentNrHRS() throws IOException {
|
||||
return wrappedConnection.getCurrentNrHRS();
|
||||
}
|
||||
|
||||
@Override
|
||||
public HTableDescriptor[] getHTableDescriptorsByTableName(
|
||||
List<TableName> tableNames) throws IOException {
|
||||
return wrappedConnection.getHTableDescriptorsByTableName(tableNames);
|
||||
}
|
||||
|
||||
@Override
|
||||
public HTableDescriptor[] getHTableDescriptors(List<String> tableNames)
|
||||
throws IOException {
|
||||
return wrappedConnection.getHTableDescriptors(tableNames);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isClosed() {
|
||||
return wrappedConnection.isClosed();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void clearCaches(ServerName sn) {
|
||||
wrappedConnection.clearCaches(sn);
|
||||
}
|
||||
|
||||
@Override
|
||||
public MasterKeepAliveConnection getKeepAliveMasterService()
|
||||
throws MasterNotRunningException {
|
||||
return wrappedConnection.getKeepAliveMasterService();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isDeadServer(ServerName serverName) {
|
||||
return wrappedConnection.isDeadServer(serverName);
|
||||
}
|
||||
|
||||
@Override
|
||||
public NonceGenerator getNonceGenerator() {
|
||||
return wrappedConnection.getNonceGenerator();
|
||||
}
|
||||
|
||||
@Override
|
||||
public AsyncProcess getAsyncProcess() {
|
||||
return wrappedConnection.getAsyncProcess();
|
||||
}
|
||||
|
||||
@Override
|
||||
public RpcRetryingCallerFactory getNewRpcRetryingCallerFactory(Configuration conf) {
|
||||
return wrappedConnection.getNewRpcRetryingCallerFactory(conf);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ServerStatisticTracker getStatisticsTracker() {
|
||||
return wrappedConnection.getStatisticsTracker();
|
||||
}
|
||||
|
||||
@Override
|
||||
public ClientBackoffPolicy getBackoffPolicy() {
|
||||
return wrappedConnection.getBackoffPolicy();
|
||||
}
|
||||
}
|
|
@ -30,6 +30,7 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
|||
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService;
|
||||
import org.apache.hadoop.hbase.security.User;
|
||||
import org.apache.hadoop.hbase.security.UserProvider;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
|
||||
|
@ -106,27 +107,33 @@ public final class ConnectionUtils {
|
|||
}
|
||||
|
||||
/**
|
||||
* Adapt a HConnection so that it can bypass the RPC layer (serialization,
|
||||
* deserialization, networking, etc..) -- i.e. short-circuit -- when talking to a local server.
|
||||
* @param conn the connection to adapt
|
||||
* Creates a short-circuit connection that can bypass the RPC layer (serialization,
|
||||
* deserialization, networking, etc..) when talking to a local server.
|
||||
* @param conf the current configuration
|
||||
* @param pool the thread pool to use for batch operations
|
||||
* @param user the user the connection is for
|
||||
* @param serverName the local server name
|
||||
* @param admin the admin interface of the local server
|
||||
* @param client the client interface of the local server
|
||||
* @return an adapted/decorated HConnection
|
||||
* @return an short-circuit connection.
|
||||
* @throws IOException
|
||||
*/
|
||||
public static ClusterConnection createShortCircuitHConnection(final Connection conn,
|
||||
final ServerName serverName, final AdminService.BlockingInterface admin,
|
||||
final ClientService.BlockingInterface client) {
|
||||
return new ConnectionAdapter(conn) {
|
||||
public static ClusterConnection createShortCircuitConnection(final Configuration conf,
|
||||
ExecutorService pool, User user, final ServerName serverName,
|
||||
final AdminService.BlockingInterface admin, final ClientService.BlockingInterface client)
|
||||
throws IOException {
|
||||
if (user == null) {
|
||||
user = UserProvider.instantiate(conf).getCurrent();
|
||||
}
|
||||
return new ConnectionImplementation(conf, pool, user) {
|
||||
@Override
|
||||
public AdminService.BlockingInterface getAdmin(
|
||||
ServerName sn, boolean getMaster) throws IOException {
|
||||
public AdminService.BlockingInterface getAdmin(ServerName sn, boolean getMaster)
|
||||
throws IOException {
|
||||
return serverName.equals(sn) ? admin : super.getAdmin(sn, getMaster);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ClientService.BlockingInterface getClient(
|
||||
ServerName sn) throws IOException {
|
||||
public ClientService.BlockingInterface getClient(ServerName sn) throws IOException {
|
||||
return serverName.equals(sn) ? client : super.getClient(sn);
|
||||
}
|
||||
};
|
||||
|
|
|
@ -78,7 +78,6 @@ import org.apache.hadoop.hbase.YouAreDeadException;
|
|||
import org.apache.hadoop.hbase.ZNodeClearer;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.client.ClusterConnection;
|
||||
import org.apache.hadoop.hbase.client.ConnectionFactory;
|
||||
import org.apache.hadoop.hbase.client.ConnectionUtils;
|
||||
import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory;
|
||||
import org.apache.hadoop.hbase.conf.ConfigurationManager;
|
||||
|
@ -675,7 +674,7 @@ public class HRegionServer extends HasThread implements
|
|||
|
||||
/**
|
||||
* Create a 'smarter' HConnection, one that is capable of by-passing RPC if the request is to
|
||||
* the local server. Safe to use going to local or remote server.
|
||||
* the local server. Safe to use going to local or remote server.
|
||||
* Create this instance in a method can be intercepted and mocked in tests.
|
||||
* @throws IOException
|
||||
*/
|
||||
|
@ -684,8 +683,8 @@ public class HRegionServer extends HasThread implements
|
|||
// Create a cluster connection that when appropriate, can short-circuit and go directly to the
|
||||
// local server if the request is to the local server bypassing RPC. Can be used for both local
|
||||
// and remote invocations.
|
||||
return ConnectionUtils.createShortCircuitHConnection(
|
||||
ConnectionFactory.createConnection(conf), serverName, rpcServices, rpcServices);
|
||||
return ConnectionUtils.createShortCircuitConnection(conf, null, userProvider.getCurrent(),
|
||||
serverName, rpcServices, rpcServices);
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -0,0 +1,85 @@
|
|||
/**
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.client;
|
||||
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.HColumnDescriptor;
|
||||
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegionServer;
|
||||
import org.apache.hadoop.hbase.regionserver.RSRpcServices;
|
||||
import org.apache.hadoop.hbase.testclassification.ClientTests;
|
||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
|
||||
@Category({ MediumTests.class, ClientTests.class })
|
||||
public class TestShortCircuitConnection {
|
||||
|
||||
private final static HBaseTestingUtility UTIL = new HBaseTestingUtility();
|
||||
|
||||
@BeforeClass
|
||||
public static void setUpBeforeClass() throws Exception {
|
||||
UTIL.startMiniCluster(1);
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void tearDownAfterClass() throws Exception {
|
||||
UTIL.shutdownMiniCluster();
|
||||
}
|
||||
|
||||
@Test
|
||||
@SuppressWarnings("deprecation")
|
||||
public void testShortCircuitConnection() throws IOException, InterruptedException {
|
||||
TableName tn = TableName.valueOf("testShortCircuitConnection");
|
||||
HTableDescriptor htd = UTIL.createTableDescriptor(tn);
|
||||
HColumnDescriptor hcd = new HColumnDescriptor(Bytes.toBytes("cf"));
|
||||
htd.addFamily(hcd);
|
||||
UTIL.createTable(htd, null);
|
||||
HRegionServer regionServer = UTIL.getRSForFirstRegionInTable(tn);
|
||||
ClusterConnection connection = regionServer.getConnection();
|
||||
HTableInterface tableIf = connection.getTable(tn);
|
||||
assertTrue(tableIf instanceof HTable);
|
||||
HTable table = (HTable) tableIf;
|
||||
assertTrue(table.getConnection() == connection);
|
||||
AdminService.BlockingInterface admin = connection.getAdmin(regionServer.getServerName());
|
||||
ClientService.BlockingInterface client = connection.getClient(regionServer.getServerName());
|
||||
assertTrue(admin instanceof RSRpcServices);
|
||||
assertTrue(client instanceof RSRpcServices);
|
||||
ServerName anotherSn = ServerName.valueOf(regionServer.getServerName().getHostAndPort(),
|
||||
EnvironmentEdgeManager.currentTime());
|
||||
admin = connection.getAdmin(anotherSn);
|
||||
client = connection.getClient(anotherSn);
|
||||
assertFalse(admin instanceof RSRpcServices);
|
||||
assertFalse(client instanceof RSRpcServices);
|
||||
assertTrue(connection.getAdmin().getConnection() == connection);
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue