From b85857cf3c735d61b27a878b604de363f0ebb8b0 Mon Sep 17 00:00:00 2001 From: stack Date: Mon, 24 Aug 2015 12:29:46 -0700 Subject: [PATCH] HBASE-13480 ShortCircuitConnection doesn't short-circuit all calls as expected (Jingcheng Du and Josh Elser) --- .../hbase/client/ConnectionAdapter.java | 467 ------------------ .../hadoop/hbase/client/ConnectionUtils.java | 31 +- .../hbase/regionserver/HRegionServer.java | 7 +- .../client/TestShortCircuitConnection.java | 85 ++++ 4 files changed, 107 insertions(+), 483 deletions(-) delete mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionAdapter.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestShortCircuitConnection.java diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionAdapter.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionAdapter.java deleted file mode 100644 index 1d8a7939a6b..00000000000 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionAdapter.java +++ /dev/null @@ -1,467 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.client; - -import java.io.IOException; -import java.util.List; -import java.util.concurrent.ExecutorService; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.HRegionLocation; -import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.MasterNotRunningException; -import org.apache.hadoop.hbase.RegionLocations; -import org.apache.hadoop.hbase.ServerName; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.ZooKeeperConnectionException; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicy; -import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback; -import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService; -import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService; -import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.MasterService; - -/** - * An internal class that delegates to an {@link HConnection} instance. - * A convenience to override when customizing method implementations. - * - * - * @see ConnectionUtils#createShortCircuitHConnection(Connection, ServerName, - * AdminService.BlockingInterface, ClientService.BlockingInterface) for case where we make - * Connections skip RPC if request is to local server. - */ -@InterfaceAudience.Private -@SuppressWarnings("deprecation") -//NOTE: DO NOT make this class public. It was made package-private on purpose. -abstract class ConnectionAdapter implements ClusterConnection { - - private final ClusterConnection wrappedConnection; - - public ConnectionAdapter(Connection c) { - wrappedConnection = (ClusterConnection)c; - } - - @Override - public void abort(String why, Throwable e) { - wrappedConnection.abort(why, e); - } - - @Override - public boolean isAborted() { - return wrappedConnection.isAborted(); - } - - @Override - public void close() throws IOException { - wrappedConnection.close(); - } - - @Override - public Configuration getConfiguration() { - return wrappedConnection.getConfiguration(); - } - - @Override - public HTableInterface getTable(String tableName) throws IOException { - return wrappedConnection.getTable(tableName); - } - - @Override - public HTableInterface getTable(byte[] tableName) throws IOException { - return wrappedConnection.getTable(tableName); - } - - @Override - public HTableInterface getTable(TableName tableName) throws IOException { - return wrappedConnection.getTable(tableName); - } - - @Override - public HTableInterface getTable(String tableName, ExecutorService pool) - throws IOException { - return wrappedConnection.getTable(tableName, pool); - } - - @Override - public HTableInterface getTable(byte[] tableName, ExecutorService pool) - throws IOException { - return wrappedConnection.getTable(tableName, pool); - } - - @Override - public HTableInterface getTable(TableName tableName, ExecutorService pool) - throws IOException { - return wrappedConnection.getTable(tableName, pool); - } - - @Override - public BufferedMutator getBufferedMutator(BufferedMutatorParams params) - throws IOException { - return wrappedConnection.getBufferedMutator(params); - } - - @Override - public BufferedMutator getBufferedMutator(TableName tableName) throws IOException { - return wrappedConnection.getBufferedMutator(tableName); - } - - @Override - public RegionLocator getRegionLocator(TableName tableName) throws IOException { - return wrappedConnection.getRegionLocator(tableName); - } - - @Override - public Admin getAdmin() throws IOException { - return wrappedConnection.getAdmin(); - } - - @Override - public boolean isMasterRunning() throws MasterNotRunningException, - ZooKeeperConnectionException { - return wrappedConnection.isMasterRunning(); - } - - @Override - public boolean isTableEnabled(TableName tableName) throws IOException { - return wrappedConnection.isTableEnabled(tableName); - } - - @Override - public boolean isTableEnabled(byte[] tableName) throws IOException { - return wrappedConnection.isTableEnabled(tableName); - } - - @Override - public boolean isTableDisabled(TableName tableName) throws IOException { - return wrappedConnection.isTableDisabled(tableName); - } - - @Override - public boolean isTableDisabled(byte[] tableName) throws IOException { - return wrappedConnection.isTableDisabled(tableName); - } - - @Override - public boolean isTableAvailable(TableName tableName) throws IOException { - return wrappedConnection.isTableAvailable(tableName); - } - - @Override - public boolean isTableAvailable(byte[] tableName) throws IOException { - return wrappedConnection.isTableAvailable(tableName); - } - - @Override - public boolean isTableAvailable(TableName tableName, byte[][] splitKeys) - throws IOException { - return wrappedConnection.isTableAvailable(tableName, splitKeys); - } - - @Override - public boolean isTableAvailable(byte[] tableName, byte[][] splitKeys) - throws IOException { - return wrappedConnection.isTableAvailable(tableName, splitKeys); - } - - @Override - public TableState getTableState(TableName tableName) throws IOException { - return wrappedConnection.getTableState(tableName); - } - - @Override - public HTableDescriptor[] listTables() throws IOException { - return wrappedConnection.listTables(); - } - - @Override - public String[] getTableNames() throws IOException { - return wrappedConnection.getTableNames(); - } - - @Override - public TableName[] listTableNames() throws IOException { - return wrappedConnection.listTableNames(); - } - - @Override - public HTableDescriptor getHTableDescriptor(TableName tableName) - throws IOException { - return wrappedConnection.getHTableDescriptor(tableName); - } - - @Override - public HTableDescriptor getHTableDescriptor(byte[] tableName) - throws IOException { - return wrappedConnection.getHTableDescriptor(tableName); - } - - @Override - public HRegionLocation locateRegion(TableName tableName, byte[] row) - throws IOException { - return wrappedConnection.locateRegion(tableName, row); - } - - @Override - public HRegionLocation locateRegion(byte[] tableName, byte[] row) - throws IOException { - return wrappedConnection.locateRegion(tableName, row); - } - - @Override - public RegionLocations locateRegion(TableName tableName, byte[] row, boolean useCache, - boolean retry) throws IOException { - return wrappedConnection.locateRegion(tableName, row, useCache, retry); - } - - @Override - public void clearRegionCache() { - wrappedConnection.clearRegionCache(); - } - - @Override - public void clearRegionCache(TableName tableName) { - wrappedConnection.clearRegionCache(tableName); - } - - @Override - public void clearRegionCache(byte[] tableName) { - wrappedConnection.clearRegionCache(tableName); - } - - @Override - public void deleteCachedRegionLocation(HRegionLocation location) { - wrappedConnection.deleteCachedRegionLocation(location); - } - - @Override - public HRegionLocation relocateRegion(TableName tableName, byte[] row) - throws IOException { - return wrappedConnection.relocateRegion(tableName, row); - } - - @Override - public HRegionLocation relocateRegion(byte[] tableName, byte[] row) - throws IOException { - return wrappedConnection.relocateRegion(tableName, row); - } - - @Override - public void updateCachedLocations(TableName tableName, byte[] rowkey, - Object exception, HRegionLocation source) { - wrappedConnection.updateCachedLocations(tableName, rowkey, exception, source); - } - - @Override - public void updateCachedLocations(TableName tableName, byte[] regionName, byte[] rowkey, - Object exception, ServerName source) { - wrappedConnection.updateCachedLocations(tableName, regionName, rowkey, exception, source); - } - - @Override - public void updateCachedLocations(byte[] tableName, byte[] rowkey, - Object exception, HRegionLocation source) { - wrappedConnection.updateCachedLocations(tableName, rowkey, exception, source); - } - - @Override - public HRegionLocation locateRegion(byte[] regionName) throws IOException { - return wrappedConnection.locateRegion(regionName); - } - - @Override - public List locateRegions(TableName tableName) - throws IOException { - return wrappedConnection.locateRegions(tableName); - } - - @Override - public List locateRegions(byte[] tableName) - throws IOException { - return wrappedConnection.locateRegions(tableName); - } - - @Override - public List locateRegions(TableName tableName, - boolean useCache, boolean offlined) throws IOException { - return wrappedConnection.locateRegions(tableName, useCache, offlined); - } - - @Override - public List locateRegions(byte[] tableName, - boolean useCache, boolean offlined) throws IOException { - return wrappedConnection.locateRegions(tableName, useCache, offlined); - } - - @Override - public RegionLocations locateRegion(TableName tableName, byte[] row, boolean useCache, - boolean retry, int replicaId) throws IOException { - return wrappedConnection.locateRegion(tableName, row, useCache, retry, replicaId); - } - - @Override - public RegionLocations relocateRegion(TableName tableName, byte[] row, int replicaId) - throws IOException { - return wrappedConnection.relocateRegion(tableName, row, replicaId); - } - - @Override - public MasterService.BlockingInterface getMaster() throws IOException { - return wrappedConnection.getMaster(); - } - - @Override - public AdminService.BlockingInterface getAdmin( - ServerName serverName) throws IOException { - return wrappedConnection.getAdmin(serverName); - } - - @Override - public ClientService.BlockingInterface getClient( - ServerName serverName) throws IOException { - return wrappedConnection.getClient(serverName); - } - - @Override - public AdminService.BlockingInterface getAdmin( - ServerName serverName, boolean getMaster) throws IOException { - return wrappedConnection.getAdmin(serverName, getMaster); - } - - @Override - public HRegionLocation getRegionLocation(TableName tableName, byte[] row, - boolean reload) throws IOException { - return wrappedConnection.getRegionLocation(tableName, row, reload); - } - - @Override - public HRegionLocation getRegionLocation(byte[] tableName, byte[] row, - boolean reload) throws IOException { - return wrappedConnection.getRegionLocation(tableName, row, reload); - } - - @Override - public void processBatch(List actions, TableName tableName, - ExecutorService pool, Object[] results) throws IOException, - InterruptedException { - wrappedConnection.processBatch(actions, tableName, pool, results); - } - - @Override - public void processBatch(List actions, byte[] tableName, - ExecutorService pool, Object[] results) throws IOException, - InterruptedException { - wrappedConnection.processBatch(actions, tableName, pool, results); - } - - @Override - public void processBatchCallback(List list, - TableName tableName, ExecutorService pool, Object[] results, - Callback callback) throws IOException, InterruptedException { - wrappedConnection.processBatchCallback(list, tableName, pool, results, callback); - } - - @Override - public void processBatchCallback(List list, - byte[] tableName, ExecutorService pool, Object[] results, - Callback callback) throws IOException, InterruptedException { - wrappedConnection.processBatchCallback(list, tableName, pool, results, callback); - } - - @Override - public void setRegionCachePrefetch(TableName tableName, boolean enable) { - wrappedConnection.setRegionCachePrefetch(tableName, enable); - } - - @Override - public void setRegionCachePrefetch(byte[] tableName, boolean enable) { - wrappedConnection.setRegionCachePrefetch(tableName, enable); - } - - @Override - public boolean getRegionCachePrefetch(TableName tableName) { - return wrappedConnection.getRegionCachePrefetch(tableName); - } - - @Override - public boolean getRegionCachePrefetch(byte[] tableName) { - return wrappedConnection.getRegionCachePrefetch(tableName); - } - - @Override - public int getCurrentNrHRS() throws IOException { - return wrappedConnection.getCurrentNrHRS(); - } - - @Override - public HTableDescriptor[] getHTableDescriptorsByTableName( - List tableNames) throws IOException { - return wrappedConnection.getHTableDescriptorsByTableName(tableNames); - } - - @Override - public HTableDescriptor[] getHTableDescriptors(List tableNames) - throws IOException { - return wrappedConnection.getHTableDescriptors(tableNames); - } - - @Override - public boolean isClosed() { - return wrappedConnection.isClosed(); - } - - @Override - public void clearCaches(ServerName sn) { - wrappedConnection.clearCaches(sn); - } - - @Override - public MasterKeepAliveConnection getKeepAliveMasterService() - throws MasterNotRunningException { - return wrappedConnection.getKeepAliveMasterService(); - } - - @Override - public boolean isDeadServer(ServerName serverName) { - return wrappedConnection.isDeadServer(serverName); - } - - @Override - public NonceGenerator getNonceGenerator() { - return wrappedConnection.getNonceGenerator(); - } - - @Override - public AsyncProcess getAsyncProcess() { - return wrappedConnection.getAsyncProcess(); - } - - @Override - public RpcRetryingCallerFactory getNewRpcRetryingCallerFactory(Configuration conf) { - return wrappedConnection.getNewRpcRetryingCallerFactory(conf); - } - - @Override - public ServerStatisticTracker getStatisticsTracker() { - return wrappedConnection.getStatisticsTracker(); - } - - @Override - public ClientBackoffPolicy getBackoffPolicy() { - return wrappedConnection.getBackoffPolicy(); - } -} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java index 323915b95fa..82c2fc481d5 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java @@ -30,6 +30,7 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService; import org.apache.hadoop.hbase.security.User; +import org.apache.hadoop.hbase.security.UserProvider; import com.google.common.annotations.VisibleForTesting; @@ -106,27 +107,33 @@ public final class ConnectionUtils { } /** - * Adapt a HConnection so that it can bypass the RPC layer (serialization, - * deserialization, networking, etc..) -- i.e. short-circuit -- when talking to a local server. - * @param conn the connection to adapt + * Creates a short-circuit connection that can bypass the RPC layer (serialization, + * deserialization, networking, etc..) when talking to a local server. + * @param conf the current configuration + * @param pool the thread pool to use for batch operations + * @param user the user the connection is for * @param serverName the local server name * @param admin the admin interface of the local server * @param client the client interface of the local server - * @return an adapted/decorated HConnection + * @return an short-circuit connection. + * @throws IOException */ - public static ClusterConnection createShortCircuitHConnection(final Connection conn, - final ServerName serverName, final AdminService.BlockingInterface admin, - final ClientService.BlockingInterface client) { - return new ConnectionAdapter(conn) { + public static ClusterConnection createShortCircuitConnection(final Configuration conf, + ExecutorService pool, User user, final ServerName serverName, + final AdminService.BlockingInterface admin, final ClientService.BlockingInterface client) + throws IOException { + if (user == null) { + user = UserProvider.instantiate(conf).getCurrent(); + } + return new ConnectionImplementation(conf, pool, user) { @Override - public AdminService.BlockingInterface getAdmin( - ServerName sn, boolean getMaster) throws IOException { + public AdminService.BlockingInterface getAdmin(ServerName sn, boolean getMaster) + throws IOException { return serverName.equals(sn) ? admin : super.getAdmin(sn, getMaster); } @Override - public ClientService.BlockingInterface getClient( - ServerName sn) throws IOException { + public ClientService.BlockingInterface getClient(ServerName sn) throws IOException { return serverName.equals(sn) ? client : super.getClient(sn); } }; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 315659a8ae9..5312338210f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -78,7 +78,6 @@ import org.apache.hadoop.hbase.YouAreDeadException; import org.apache.hadoop.hbase.ZNodeClearer; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.ClusterConnection; -import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.ConnectionUtils; import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory; import org.apache.hadoop.hbase.conf.ConfigurationManager; @@ -675,7 +674,7 @@ public class HRegionServer extends HasThread implements /** * Create a 'smarter' HConnection, one that is capable of by-passing RPC if the request is to - * the local server. Safe to use going to local or remote server. + * the local server. Safe to use going to local or remote server. * Create this instance in a method can be intercepted and mocked in tests. * @throws IOException */ @@ -684,8 +683,8 @@ public class HRegionServer extends HasThread implements // Create a cluster connection that when appropriate, can short-circuit and go directly to the // local server if the request is to the local server bypassing RPC. Can be used for both local // and remote invocations. - return ConnectionUtils.createShortCircuitHConnection( - ConnectionFactory.createConnection(conf), serverName, rpcServices, rpcServices); + return ConnectionUtils.createShortCircuitConnection(conf, null, userProvider.getCurrent(), + serverName, rpcServices, rpcServices); } /** diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestShortCircuitConnection.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestShortCircuitConnection.java new file mode 100644 index 00000000000..e84d34c0707 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestShortCircuitConnection.java @@ -0,0 +1,85 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; + +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService; +import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.regionserver.RSRpcServices; +import org.apache.hadoop.hbase.testclassification.ClientTests; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({ MediumTests.class, ClientTests.class }) +public class TestShortCircuitConnection { + + private final static HBaseTestingUtility UTIL = new HBaseTestingUtility(); + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + UTIL.startMiniCluster(1); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + UTIL.shutdownMiniCluster(); + } + + @Test + @SuppressWarnings("deprecation") + public void testShortCircuitConnection() throws IOException, InterruptedException { + TableName tn = TableName.valueOf("testShortCircuitConnection"); + HTableDescriptor htd = UTIL.createTableDescriptor(tn); + HColumnDescriptor hcd = new HColumnDescriptor(Bytes.toBytes("cf")); + htd.addFamily(hcd); + UTIL.createTable(htd, null); + HRegionServer regionServer = UTIL.getRSForFirstRegionInTable(tn); + ClusterConnection connection = regionServer.getConnection(); + HTableInterface tableIf = connection.getTable(tn); + assertTrue(tableIf instanceof HTable); + HTable table = (HTable) tableIf; + assertTrue(table.getConnection() == connection); + AdminService.BlockingInterface admin = connection.getAdmin(regionServer.getServerName()); + ClientService.BlockingInterface client = connection.getClient(regionServer.getServerName()); + assertTrue(admin instanceof RSRpcServices); + assertTrue(client instanceof RSRpcServices); + ServerName anotherSn = ServerName.valueOf(regionServer.getServerName().getHostAndPort(), + EnvironmentEdgeManager.currentTime()); + admin = connection.getAdmin(anotherSn); + client = connection.getClient(anotherSn); + assertFalse(admin instanceof RSRpcServices); + assertFalse(client instanceof RSRpcServices); + assertTrue(connection.getAdmin().getConnection() == connection); + } +}