From 05200976110135abb60f9b879b9b830671c07141 Mon Sep 17 00:00:00 2001 From: Enis Soztutar Date: Wed, 23 Mar 2016 12:30:41 -0700 Subject: [PATCH] HBASE-15295 MutateTableAccess.multiMutate() does not get high priority causing a deadlock --- .../org/apache/hadoop/hbase/HRegionInfo.java | 1 + .../hbase/client/BufferedMutatorImpl.java | 2 +- .../hbase/client/ClusterConnection.java | 20 +- ...tion.java => ConnectionConfiguration.java} | 12 +- .../client/ConnectionImplementation.java | 69 ++- .../hadoop/hbase/client/HBaseAdmin.java | 511 ++++++++++++------ .../apache/hadoop/hbase/client/HTable.java | 55 +- .../hadoop/hbase/ipc/AbstractRpcClient.java | 2 +- .../hbase/ipc/CoprocessorRpcChannel.java | 11 +- .../ipc/MasterCoprocessorRpcChannel.java | 18 +- .../ipc/RegionCoprocessorRpcChannel.java | 46 +- .../RegionServerCoprocessorRpcChannel.java | 10 +- .../hadoop/hbase/protobuf/ProtobufUtil.java | 168 +++--- .../security/access/AccessControlClient.java | 46 +- .../hbase/zookeeper/MetaTableLocator.java | 18 +- .../hbase/client/TestSnapshotFromAdmin.java | 31 +- .../hadoop/hbase/DistributedHBaseCluster.java | 4 +- .../tmpl/regionserver/RSStatusTmpl.jamon | 2 +- .../apache/hadoop/hbase/ipc/CallRunner.java | 5 +- .../apache/hadoop/hbase/ipc/RpcScheduler.java | 2 +- .../apache/hadoop/hbase/ipc/RpcServer.java | 12 +- .../master/RegionPlacementMaintainer.java | 2 +- .../hadoop/hbase/master/ServerManager.java | 34 +- .../apache/hadoop/hbase/MiniHBaseCluster.java | 4 +- .../hadoop/hbase/TestGlobalMemStoreSize.java | 14 +- .../hadoop/hbase/TestMetaTableAccessor.java | 79 +++ .../hadoop/hbase/TestMetaTableLocator.java | 8 +- .../client/HConnectionTestingUtility.java | 5 + .../hadoop/hbase/client/TestAdmin1.java | 4 +- .../hbase/client/TestFromClientSide.java | 6 +- .../hbase/client/TestFromClientSide3.java | 10 +- .../hbase/client/TestHBaseAdminNoCluster.java | 10 + .../client/TestScannersFromClientSide.java | 6 +- .../hbase/ipc/DelegatingRpcScheduler.java | 76 +++ ...estLoadIncrementalHFilesSplitRecovery.java | 3 +- .../hbase/master/TestClockSkewDetection.java | 20 +- .../TestRegionServerNoMaster.java | 2 +- .../hbase/security/access/SecureTestUtil.java | 12 +- .../security/access/TestAccessController.java | 20 +- .../access/TestNamespaceCommands.java | 13 +- 40 files changed, 957 insertions(+), 416 deletions(-) rename hbase-client/src/main/java/org/apache/hadoop/hbase/client/{TableConfiguration.java => ConnectionConfiguration.java} (93%) create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/DelegatingRpcScheduler.java diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/HRegionInfo.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/HRegionInfo.java index 13ba23d5bb8..71f87f7739d 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/HRegionInfo.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/HRegionInfo.java @@ -202,6 +202,7 @@ public class HRegionInfo implements Comparable { public final static byte[] HIDDEN_START_KEY = Bytes.toBytes("hidden-start-key"); /** HRegionInfo for first meta region */ + // TODO: How come Meta regions still do not have encoded region names? Fix. public static final HRegionInfo FIRST_META_REGIONINFO = new HRegionInfo(1L, TableName.META_TABLE_NAME); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java index ef3f7e94ace..01aaec541af 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java @@ -88,7 +88,7 @@ public class BufferedMutatorImpl implements BufferedMutator { this.pool = params.getPool(); this.listener = params.getListener(); - TableConfiguration tableConf = new TableConfiguration(conf); + ConnectionConfiguration tableConf = new ConnectionConfiguration(conf); this.writeBufferSize = params.getWriteBufferSize() != BufferedMutatorParams.UNSET ? params.getWriteBufferSize() : tableConf.getWriteBufferSize(); this.maxKeyValueSize = params.getMaxKeyValueSize() != BufferedMutatorParams.UNSET ? diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterConnection.java index 45589beb28e..d348ffc4806 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterConnection.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterConnection.java @@ -31,11 +31,12 @@ import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.ZooKeeperConnectionException; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicy; +import org.apache.hadoop.hbase.ipc.RpcControllerFactory; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService; import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.MasterService; -/** Internal methods on HConnection that should not be used by user code. */ +/** Internal methods on Connection that should not be used by user code. */ @InterfaceAudience.Private // NOTE: Although this class is public, this class is meant to be used directly from internal // classes and unit tests only. @@ -287,7 +288,22 @@ public interface ClusterConnection extends HConnection { * @return RpcRetryingCallerFactory */ RpcRetryingCallerFactory getNewRpcRetryingCallerFactory(Configuration conf); - + + /** + * @return Connection's RpcRetryingCallerFactory instance + */ + RpcRetryingCallerFactory getRpcRetryingCallerFactory(); + + /** + * @return Connection's RpcControllerFactory instance + */ + RpcControllerFactory getRpcControllerFactory(); + + /** + * @return a ConnectionConfiguration object holding parsed configuration values + */ + ConnectionConfiguration getConnectionConfiguration(); + /** * @return the current statistics tracker associated with this connection */ diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableConfiguration.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionConfiguration.java similarity index 93% rename from hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableConfiguration.java rename to hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionConfiguration.java index 1113cfd517a..35bebae5970 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableConfiguration.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionConfiguration.java @@ -18,15 +18,15 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience; import com.google.common.annotations.VisibleForTesting; /** - * + * Configuration parameters for the connection. * Configuration is a heavy weight registry that does a lot of string operations and regex matching. * Method calls into Configuration account for high CPU usage and have huge performance impact. - * This class caches the value in the TableConfiguration object to improve performance. + * This class caches connection-related configuration values in the ConnectionConfiguration + * object so that expensive conf.getXXX() calls are avoided every time HTable, etc is instantiated. * see HBASE-12128 - * */ @InterfaceAudience.Private -public class TableConfiguration { +public class ConnectionConfiguration { public static final String WRITE_BUFFER_SIZE_KEY = "hbase.client.write.buffer"; public static final long WRITE_BUFFER_SIZE_DEFAULT = 2097152; @@ -50,7 +50,7 @@ public class TableConfiguration { * Constructor * @param conf Configuration object */ - TableConfiguration(Configuration conf) { + ConnectionConfiguration(Configuration conf) { this.writeBufferSize = conf.getLong(WRITE_BUFFER_SIZE_KEY, WRITE_BUFFER_SIZE_DEFAULT); this.metaOperationTimeout = conf.getInt( @@ -88,7 +88,7 @@ public class TableConfiguration { * In real usage, we should read the configuration from the Configuration object. */ @VisibleForTesting - protected TableConfiguration() { + protected ConnectionConfiguration() { this.writeBufferSize = WRITE_BUFFER_SIZE_DEFAULT; this.metaOperationTimeout = HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT; this.operationTimeout = HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT; diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java index fd4dc6db145..ecaf18b5650 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java @@ -25,6 +25,26 @@ import com.google.protobuf.BlockingRpcChannel; import com.google.protobuf.RpcController; import com.google.protobuf.ServiceException; +import java.io.Closeable; +import java.io.IOException; +import java.io.InterruptedIOException; +import java.lang.reflect.UndeclaredThrowableException; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.Date; +import java.util.List; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import javax.annotation.Nullable; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -78,25 +98,6 @@ import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.apache.hadoop.ipc.RemoteException; import org.apache.zookeeper.KeeperException; -import javax.annotation.Nullable; - -import java.io.Closeable; -import java.io.IOException; -import java.io.InterruptedIOException; -import java.lang.reflect.UndeclaredThrowableException; -import java.net.InetAddress; -import java.net.InetSocketAddress; -import java.util.ArrayList; -import java.util.Date; -import java.util.List; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; /** * Main implementation of {@link Connection} and {@link ClusterConnection} interfaces. @@ -158,7 +159,7 @@ class ConnectionImplementation implements ClusterConnection, Closeable { // cache the configuration value for tables so that we can avoid calling // the expensive Configuration to fetch the value multiple times. - private final TableConfiguration tableConfig; + private final ConnectionConfiguration connectionConfig; // Client rpc instance. private RpcClient rpcClient; @@ -190,14 +191,14 @@ class ConnectionImplementation implements ClusterConnection, Closeable { this.conf = conf; this.user = user; this.batchPool = pool; - this.tableConfig = new TableConfiguration(conf); + this.connectionConfig = new ConnectionConfiguration(conf); this.closed = false; this.pause = conf.getLong(HConstants.HBASE_CLIENT_PAUSE, HConstants.DEFAULT_HBASE_CLIENT_PAUSE); this.useMetaReplicas = conf.getBoolean(HConstants.USE_META_REPLICAS, HConstants.DEFAULT_USE_META_REPLICAS); // how many times to try, one more than max *retry* time - this.numTries = tableConfig.getRetriesNumber() + 1; + this.numTries = connectionConfig.getRetriesNumber() + 1; this.rpcTimeout = conf.getInt( HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT); @@ -306,7 +307,8 @@ class ConnectionImplementation implements ClusterConnection, Closeable { @Override public HTableInterface getTable(TableName tableName, ExecutorService pool) throws IOException { - return new HTable(tableName, this, tableConfig, rpcCallerFactory, rpcControllerFactory, pool); + return new HTable(tableName, this, connectionConfig, + rpcCallerFactory, rpcControllerFactory, pool); } @Override @@ -318,10 +320,10 @@ class ConnectionImplementation implements ClusterConnection, Closeable { params.pool(HTable.getDefaultExecutor(getConfiguration())); } if (params.getWriteBufferSize() == BufferedMutatorParams.UNSET) { - params.writeBufferSize(tableConfig.getWriteBufferSize()); + params.writeBufferSize(connectionConfig.getWriteBufferSize()); } if (params.getMaxKeyValueSize() == BufferedMutatorParams.UNSET) { - params.maxKeyValueSize(tableConfig.getMaxKeyValueSize()); + params.maxKeyValueSize(connectionConfig.getMaxKeyValueSize()); } return new BufferedMutatorImpl(this, rpcCallerFactory, rpcControllerFactory, params); } @@ -2281,4 +2283,19 @@ class ConnectionImplementation implements ClusterConnection, Closeable { public boolean hasCellBlockSupport() { return this.rpcClient.hasCellBlockSupport(); } -} \ No newline at end of file + + @Override + public ConnectionConfiguration getConnectionConfiguration() { + return this.connectionConfig; + } + + @Override + public RpcRetryingCallerFactory getRpcRetryingCallerFactory() { + return this.rpcCallerFactory; + } + + @Override + public RpcControllerFactory getRpcControllerFactory() { + return this.rpcControllerFactory; + } +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java index c2a0bb86958..c1d07aeeaca 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java @@ -68,7 +68,9 @@ import org.apache.hadoop.hbase.exceptions.DeserializationException; import org.apache.hadoop.hbase.exceptions.TimeoutIOException; import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel; import org.apache.hadoop.hbase.ipc.MasterCoprocessorRpcChannel; +import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController; import org.apache.hadoop.hbase.ipc.RegionServerCoprocessorRpcChannel; +import org.apache.hadoop.hbase.ipc.RpcControllerFactory; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.RequestConverter; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService; @@ -217,6 +219,7 @@ public class HBaseAdmin implements Admin { private int operationTimeout; private RpcRetryingCallerFactory rpcCallerFactory; + private RpcControllerFactory rpcControllerFactory; private NonceGenerator ng; @@ -229,6 +232,7 @@ public class HBaseAdmin implements Admin { this.conf = connection.getConfiguration(); this.connection = connection; + // TODO: receive ConnectionConfiguration here rather than re-parsing these configs every time. this.pause = this.conf.getLong(HConstants.HBASE_CLIENT_PAUSE, HConstants.DEFAULT_HBASE_CLIENT_PAUSE); this.numRetries = this.conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, @@ -240,7 +244,8 @@ public class HBaseAdmin implements Admin { this.syncWaitTimeout = this.conf.getInt( "hbase.client.sync.wait.timeout.msec", 10 * 60000); // 10min - this.rpcCallerFactory = RpcRetryingCallerFactory.instantiate(this.conf); + this.rpcCallerFactory = connection.getRpcRetryingCallerFactory(); + this.rpcControllerFactory = connection.getRpcControllerFactory(); this.ng = this.connection.getNonceGenerator(); } @@ -266,17 +271,19 @@ public class HBaseAdmin implements Admin { @Override public Future abortProcedureAsync( - final long procId, - final boolean mayInterruptIfRunning) throws IOException { + final long procId, + final boolean mayInterruptIfRunning) throws IOException { Boolean abortProcResponse = executeCallable( new MasterCallable(getConnection()) { - @Override - public AbortProcedureResponse call(int callTimeout) throws ServiceException { - AbortProcedureRequest abortProcRequest = - AbortProcedureRequest.newBuilder().setProcId(procId).build(); - return master.abortProcedure(null,abortProcRequest); - } - }).getIsProcedureAborted(); + @Override + public AbortProcedureResponse call(int callTimeout) throws ServiceException { + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + controller.setCallTimeout(callTimeout); + AbortProcedureRequest abortProcRequest = + AbortProcedureRequest.newBuilder().setProcId(procId).build(); + return master.abortProcedure(controller, abortProcRequest); + } + }).getIsProcedureAborted(); AbortProcedureFuture abortProcFuture = new AbortProcedureFuture(this, procId, abortProcResponse); @@ -342,9 +349,11 @@ public class HBaseAdmin implements Admin { return executeCallable(new MasterCallable(getConnection()) { @Override public HTableDescriptor[] call(int callTimeout) throws ServiceException { + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + controller.setCallTimeout(callTimeout); GetTableDescriptorsRequest req = RequestConverter.buildGetTableDescriptorsRequest(pattern, includeSysTables); - return ProtobufUtil.getHTableDescriptorArray(master.getTableDescriptors(null, req)); + return ProtobufUtil.getHTableDescriptorArray(master.getTableDescriptors(controller, req)); } }); } @@ -376,9 +385,11 @@ public class HBaseAdmin implements Admin { return executeCallable(new MasterCallable(getConnection()) { @Override public TableName[] call(int callTimeout) throws ServiceException { + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + controller.setCallTimeout(callTimeout); GetTableNamesRequest req = RequestConverter.buildGetTableNamesRequest(pattern, includeSysTables); - return ProtobufUtil.getTableNameArray(master.getTableNames(null, req) + return ProtobufUtil.getTableNameArray(master.getTableNames(controller, req) .getTableNamesList()); } }); @@ -392,19 +403,23 @@ public class HBaseAdmin implements Admin { @Override public HTableDescriptor getTableDescriptor(final TableName tableName) throws IOException { - return getTableDescriptor(tableName, getConnection(), rpcCallerFactory, operationTimeout); + return getTableDescriptor(tableName, getConnection(), rpcCallerFactory, rpcControllerFactory, + operationTimeout); } static HTableDescriptor getTableDescriptor(final TableName tableName, HConnection connection, - RpcRetryingCallerFactory rpcCallerFactory, int operationTimeout) throws IOException { + RpcRetryingCallerFactory rpcCallerFactory, final RpcControllerFactory rpcControllerFactory, + int operationTimeout) throws IOException { if (tableName == null) return null; HTableDescriptor htd = executeCallable(new MasterCallable(connection) { @Override public HTableDescriptor call(int callTimeout) throws ServiceException { + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + controller.setCallTimeout(callTimeout); GetTableDescriptorsResponse htds; GetTableDescriptorsRequest req = RequestConverter.buildGetTableDescriptorsRequest(tableName); - htds = master.getTableDescriptors(null, req); + htds = master.getTableDescriptors(controller, req); if (!htds.getTableSchemaList().isEmpty()) { return HTableDescriptor.convert(htds.getTableSchemaList().get(0)); @@ -483,14 +498,17 @@ public class HBaseAdmin implements Admin { } CreateTableResponse response = executeCallable( - new MasterCallable(getConnection()) { - @Override - public CreateTableResponse call(int callTimeout) throws ServiceException { - CreateTableRequest request = RequestConverter.buildCreateTableRequest( - desc, splitKeys, ng.getNonceGroup(), ng.newNonce()); - return master.createTable(null, request); - } - }); + new MasterCallable(getConnection()) { + @Override + public CreateTableResponse call(int callTimeout) throws ServiceException { + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + controller.setCallTimeout(callTimeout); + controller.setPriority(desc.getTableName()); + CreateTableRequest request = RequestConverter.buildCreateTableRequest( + desc, splitKeys, ng.getNonceGroup(), ng.newNonce()); + return master.createTable(controller, request); + } + }); return new CreateTableFuture(this, desc, splitKeys, response); } @@ -532,14 +550,17 @@ public class HBaseAdmin implements Admin { @Override public Future deleteTableAsync(final TableName tableName) throws IOException { DeleteTableResponse response = executeCallable( - new MasterCallable(getConnection()) { - @Override - public DeleteTableResponse call(int callTimeout) throws ServiceException { - DeleteTableRequest req = - RequestConverter.buildDeleteTableRequest(tableName, ng.getNonceGroup(), ng.newNonce()); - return master.deleteTable(null,req); - } - }); + new MasterCallable(getConnection()) { + @Override + public DeleteTableResponse call(int callTimeout) throws ServiceException { + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + controller.setCallTimeout(callTimeout); + controller.setPriority(tableName); + DeleteTableRequest req = + RequestConverter.buildDeleteTableRequest(tableName, ng.getNonceGroup(),ng.newNonce()); + return master.deleteTable(controller,req); + } + }); return new DeleteTableFuture(this, tableName, response); } @@ -614,10 +635,13 @@ public class HBaseAdmin implements Admin { executeCallable(new MasterCallable(getConnection()) { @Override public TruncateTableResponse call(int callTimeout) throws ServiceException { + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + controller.setCallTimeout(callTimeout); + controller.setPriority(tableName); LOG.info("Started truncating " + tableName); TruncateTableRequest req = RequestConverter.buildTruncateTableRequest( tableName, preserveSplits, ng.getNonceGroup(), ng.newNonce()); - return master.truncateTable(null, req); + return master.truncateTable(controller, req); } }); return new TruncateTableFuture(this, tableName, preserveSplits, response); @@ -714,15 +738,19 @@ public class HBaseAdmin implements Admin { public Future enableTableAsync(final TableName tableName) throws IOException { TableName.isLegalFullyQualifiedTableName(tableName.getName()); EnableTableResponse response = executeCallable( - new MasterCallable(getConnection()) { - @Override - public EnableTableResponse call(int callTimeout) throws ServiceException { - LOG.info("Started enable of " + tableName); - EnableTableRequest req = - RequestConverter.buildEnableTableRequest(tableName, ng.getNonceGroup(), ng.newNonce()); - return master.enableTable(null,req); - } - }); + new MasterCallable(getConnection()) { + @Override + public EnableTableResponse call(int callTimeout) throws ServiceException { + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + controller.setCallTimeout(callTimeout); + controller.setPriority(tableName); + + LOG.info("Started enable of " + tableName); + EnableTableRequest req = + RequestConverter.buildEnableTableRequest(tableName, ng.getNonceGroup(),ng.newNonce()); + return master.enableTable(controller,req); + } + }); return new EnableTableFuture(this, tableName, response); } @@ -776,15 +804,20 @@ public class HBaseAdmin implements Admin { public Future disableTableAsync(final TableName tableName) throws IOException { TableName.isLegalFullyQualifiedTableName(tableName.getName()); DisableTableResponse response = executeCallable( - new MasterCallable(getConnection()) { - @Override - public DisableTableResponse call(int callTimeout) throws ServiceException { - LOG.info("Started disable of " + tableName); - DisableTableRequest req = - RequestConverter.buildDisableTableRequest(tableName, ng.getNonceGroup(), ng.newNonce()); - return master.disableTable(null, req); - } - }); + new MasterCallable(getConnection()) { + @Override + public DisableTableResponse call(int callTimeout) throws ServiceException { + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + controller.setCallTimeout(callTimeout); + controller.setPriority(tableName); + + LOG.info("Started disable of " + tableName); + DisableTableRequest req = + RequestConverter.buildDisableTableRequest( + tableName, ng.getNonceGroup(), ng.newNonce()); + return master.disableTable(controller, req); + } + }); return new DisableTableFuture(this, tableName, response); } @@ -863,9 +896,13 @@ public class HBaseAdmin implements Admin { return executeCallable(new MasterCallable>(getConnection()) { @Override public Pair call(int callTimeout) throws ServiceException { + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + controller.setCallTimeout(callTimeout); + controller.setPriority(tableName); + GetSchemaAlterStatusRequest req = RequestConverter .buildGetSchemaAlterStatusRequest(tableName); - GetSchemaAlterStatusResponse ret = master.getSchemaAlterStatus(null, req); + GetSchemaAlterStatusResponse ret = master.getSchemaAlterStatus(controller, req); Pair pair = new Pair<>(ret.getYetToUpdateRegions(), ret.getTotalRegions()); return pair; @@ -897,10 +934,14 @@ public class HBaseAdmin implements Admin { executeCallable(new MasterCallable(getConnection()) { @Override public AddColumnResponse call(int callTimeout) throws ServiceException { + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + controller.setCallTimeout(callTimeout); + controller.setPriority(tableName); + AddColumnRequest req = RequestConverter.buildAddColumnRequest(tableName, columnFamily, ng.getNonceGroup(), ng.newNonce()); - return master.addColumn(null, req); + return master.addColumn(controller, req); } }); return new AddColumnFamilyFuture(this, tableName, response); @@ -938,10 +979,14 @@ public class HBaseAdmin implements Admin { executeCallable(new MasterCallable(getConnection()) { @Override public DeleteColumnResponse call(int callTimeout) throws ServiceException { + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + controller.setCallTimeout(callTimeout); + controller.setPriority(tableName); + DeleteColumnRequest req = RequestConverter.buildDeleteColumnRequest(tableName, columnFamily, ng.getNonceGroup(), ng.newNonce()); - master.deleteColumn(null, req); + master.deleteColumn(controller, req); return null; } }); @@ -980,10 +1025,14 @@ public class HBaseAdmin implements Admin { executeCallable(new MasterCallable(getConnection()) { @Override public ModifyColumnResponse call(int callTimeout) throws ServiceException { + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + controller.setCallTimeout(callTimeout); + controller.setPriority(tableName); + ModifyColumnRequest req = RequestConverter.buildModifyColumnRequest(tableName, columnFamily, ng.getNonceGroup(), ng.newNonce()); - master.modifyColumn(null, req); + master.modifyColumn(controller, req); return null; } }); @@ -1042,7 +1091,10 @@ public class HBaseAdmin implements Admin { CloseRegionRequest request = RequestConverter.buildCloseRegionRequest(sn, encodedRegionName); try { - CloseRegionResponse response = admin.closeRegion(null, request); + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + + // TODO: this does not do retries, it should. Set priority and timeout in controller + CloseRegionResponse response = admin.closeRegion(controller, request); boolean isRegionClosed = response.getClosed(); if (false == isRegionClosed) { LOG.error("Not able to close the region " + encodedRegionName + "."); @@ -1056,14 +1108,17 @@ public class HBaseAdmin implements Admin { @Override public void closeRegion(final ServerName sn, final HRegionInfo hri) throws IOException { AdminService.BlockingInterface admin = this.connection.getAdmin(sn); + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + // Close the region without updating zk state. - ProtobufUtil.closeRegion(admin, sn, hri.getRegionName()); + ProtobufUtil.closeRegion(controller, admin, sn, hri.getRegionName()); } @Override public List getOnlineRegions(final ServerName sn) throws IOException { AdminService.BlockingInterface admin = this.connection.getAdmin(sn); - return ProtobufUtil.getOnlineRegions(admin); + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + return ProtobufUtil.getOnlineRegions(controller, admin); } @Override @@ -1088,23 +1143,15 @@ public class HBaseAdmin implements Admin { } HRegionInfo hRegionInfo = regionServerPair.getFirst(); ServerName serverName = regionServerPair.getSecond(); + + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + AdminService.BlockingInterface admin = this.connection.getAdmin(serverName); FlushRegionRequest request = RequestConverter.buildFlushRegionRequest(hRegionInfo.getRegionName()); try { - admin.flushRegion(null, request); - } catch (ServiceException se) { - throw ProtobufUtil.getRemoteException(se); - } - } - - private void flush(final ServerName sn, final HRegionInfo hri) - throws IOException { - AdminService.BlockingInterface admin = this.connection.getAdmin(sn); - FlushRegionRequest request = - RequestConverter.buildFlushRegionRequest(hri.getRegionName()); - try { - admin.flushRegion(null, request); + // TODO: this does not do retries, it should. Set priority and timeout in controller + admin.flushRegion(controller, request); } catch (ServiceException se) { throw ProtobufUtil.getRemoteException(se); } @@ -1258,11 +1305,13 @@ public class HBaseAdmin implements Admin { private void compact(final ServerName sn, final HRegionInfo hri, final boolean major, final byte [] family) throws IOException { + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); AdminService.BlockingInterface admin = this.connection.getAdmin(sn); CompactRegionRequest request = RequestConverter.buildCompactRegionRequest(hri.getRegionName(), major, family); try { - admin.compactRegion(null, request); + // TODO: this does not do retries, it should. Set priority and timeout in controller + admin.compactRegion(controller, request); } catch (ServiceException se) { throw ProtobufUtil.getRemoteException(se); } @@ -1275,10 +1324,17 @@ public class HBaseAdmin implements Admin { executeCallable(new MasterCallable(getConnection()) { @Override public Void call(int callTimeout) throws ServiceException { + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + controller.setCallTimeout(callTimeout); + // Hard to know the table name, at least check if meta + if (isMetaRegion(encodedRegionName)) { + controller.setPriority(TableName.META_TABLE_NAME); + } + try { MoveRegionRequest request = RequestConverter.buildMoveRegionRequest(encodedRegionName, destServerName); - master.moveRegion(null, request); + master.moveRegion(controller, request); } catch (DeserializationException de) { LOG.error("Could not parse destination server name: " + de); throw new ServiceException(new DoNotRetryIOException(de)); @@ -1288,6 +1344,11 @@ public class HBaseAdmin implements Admin { }); } + private boolean isMetaRegion(final byte[] regionName) { + return Bytes.equals(regionName, HRegionInfo.FIRST_META_REGIONINFO.getRegionName()) + || Bytes.equals(regionName, HRegionInfo.FIRST_META_REGIONINFO.getEncodedNameAsBytes()); + } + @Override public void assign(final byte[] regionName) throws MasterNotRunningException, ZooKeeperConnectionException, IOException { @@ -1295,9 +1356,16 @@ public class HBaseAdmin implements Admin { executeCallable(new MasterCallable(getConnection()) { @Override public Void call(int callTimeout) throws ServiceException { + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + controller.setCallTimeout(callTimeout); + // Hard to know the table name, at least check if meta + if (isMetaRegion(regionName)) { + controller.setPriority(TableName.META_TABLE_NAME); + } + AssignRegionRequest request = RequestConverter.buildAssignRegionRequest(toBeAssigned); - master.assignRegion(null,request); + master.assignRegion(controller,request); return null; } }); @@ -1310,9 +1378,15 @@ public class HBaseAdmin implements Admin { executeCallable(new MasterCallable(getConnection()) { @Override public Void call(int callTimeout) throws ServiceException { + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + controller.setCallTimeout(callTimeout); + // Hard to know the table name, at least check if meta + if (isMetaRegion(regionName)) { + controller.setPriority(TableName.META_TABLE_NAME); + } UnassignRegionRequest request = RequestConverter.buildUnassignRegionRequest(toBeUnassigned, force); - master.unassignRegion(null, request); + master.unassignRegion(controller, request); return null; } }); @@ -1324,7 +1398,13 @@ public class HBaseAdmin implements Admin { executeCallable(new MasterCallable(getConnection()) { @Override public Void call(int callTimeout) throws ServiceException { - master.offlineRegion(null,RequestConverter.buildOfflineRegionRequest(regionName)); + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + controller.setCallTimeout(callTimeout); + // Hard to know the table name, at least check if meta + if (isMetaRegion(regionName)) { + controller.setPriority(TableName.META_TABLE_NAME); + } + master.offlineRegion(controller, RequestConverter.buildOfflineRegionRequest(regionName)); return null; } }); @@ -1336,9 +1416,12 @@ public class HBaseAdmin implements Admin { return executeCallable(new MasterCallable(getConnection()) { @Override public Boolean call(int callTimeout) throws ServiceException { + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + controller.setCallTimeout(callTimeout); + SetBalancerRunningRequest req = RequestConverter.buildSetBalancerRunningRequest(on, synchronous); - return master.setBalancerRunning(null, req).getPrevBalanceValue(); + return master.setBalancerRunning(controller, req).getPrevBalanceValue(); } }); } @@ -1348,7 +1431,11 @@ public class HBaseAdmin implements Admin { return executeCallable(new MasterCallable(getConnection()) { @Override public Boolean call(int callTimeout) throws ServiceException { - return master.balance(null, RequestConverter.buildBalanceRequest(false)).getBalancerRan(); + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + controller.setCallTimeout(callTimeout); + + return master.balance(controller, + RequestConverter.buildBalanceRequest(false)).getBalancerRan(); } }); } @@ -1358,7 +1445,11 @@ public class HBaseAdmin implements Admin { return executeCallable(new MasterCallable(getConnection()) { @Override public Boolean call(int callTimeout) throws ServiceException { - return master.balance(null, RequestConverter.buildBalanceRequest(force)).getBalancerRan(); + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + controller.setCallTimeout(callTimeout); + + return master.balance(controller, + RequestConverter.buildBalanceRequest(force)).getBalancerRan(); } }); } @@ -1368,8 +1459,11 @@ public class HBaseAdmin implements Admin { return executeCallable(new MasterCallable(getConnection()) { @Override public Boolean call(int callTimeout) throws ServiceException { - return master.isBalancerEnabled(null, RequestConverter.buildIsBalancerEnabledRequest()) - .getEnabled(); + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + controller.setCallTimeout(callTimeout); + + return master.isBalancerEnabled(controller, + RequestConverter.buildIsBalancerEnabledRequest()).getEnabled(); } }); } @@ -1379,7 +1473,10 @@ public class HBaseAdmin implements Admin { return executeCallable(new MasterCallable(getConnection()) { @Override public Boolean call(int callTimeout) throws ServiceException { - return master.normalize(null, + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + controller.setCallTimeout(callTimeout); + + return master.normalize(controller, RequestConverter.buildNormalizeRequest()).getNormalizerRan(); } }); @@ -1390,7 +1487,10 @@ public class HBaseAdmin implements Admin { return executeCallable(new MasterCallable(getConnection()) { @Override public Boolean call(int callTimeout) throws ServiceException { - return master.isNormalizerEnabled(null, + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + controller.setCallTimeout(callTimeout); + + return master.isNormalizerEnabled(controller, RequestConverter.buildIsNormalizerEnabledRequest()).getEnabled(); } }); @@ -1401,9 +1501,12 @@ public class HBaseAdmin implements Admin { return executeCallable(new MasterCallable(getConnection()) { @Override public Boolean call(int callTimeout) throws ServiceException { + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + controller.setCallTimeout(callTimeout); + SetNormalizerRunningRequest req = RequestConverter.buildSetNormalizerRunningRequest(on); - return master.setNormalizerRunning(null, req).getPrevNormalizerValue(); + return master.setNormalizerRunning(controller, req).getPrevNormalizerValue(); } }); } @@ -1413,7 +1516,10 @@ public class HBaseAdmin implements Admin { return executeCallable(new MasterCallable(getConnection()) { @Override public Boolean call(int callTimeout) throws ServiceException { - return master.enableCatalogJanitor(null, + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + controller.setCallTimeout(callTimeout); + + return master.enableCatalogJanitor(controller, RequestConverter.buildEnableCatalogJanitorRequest(enable)).getPrevValue(); } }); @@ -1424,7 +1530,10 @@ public class HBaseAdmin implements Admin { return executeCallable(new MasterCallable(getConnection()) { @Override public Integer call(int callTimeout) throws ServiceException { - return master.runCatalogScan(null, + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + controller.setCallTimeout(callTimeout); + + return master.runCatalogScan(controller, RequestConverter.buildCatalogScanRequest()).getScanResult(); } }); @@ -1435,7 +1544,10 @@ public class HBaseAdmin implements Admin { return executeCallable(new MasterCallable(getConnection()) { @Override public Boolean call(int callTimeout) throws ServiceException { - return master.isCatalogJanitorEnabled(null, + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + controller.setCallTimeout(callTimeout); + + return master.isCatalogJanitorEnabled(controller, RequestConverter.buildIsCatalogJanitorEnabledRequest()).getValue(); } }); @@ -1480,11 +1592,14 @@ public class HBaseAdmin implements Admin { executeCallable(new MasterCallable(getConnection()) { @Override public Void call(int callTimeout) throws ServiceException { + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + controller.setCallTimeout(callTimeout); + try { DispatchMergingRegionsRequest request = RequestConverter .buildDispatchMergingRegionsRequest(encodedNameOfRegionA, encodedNameOfRegionB, forcible); - master.dispatchMergingRegions(null, request); + master.dispatchMergingRegions(controller, request); } catch (DeserializationException de) { LOG.error("Could not parse destination server name: " + de); } @@ -1562,28 +1677,35 @@ public class HBaseAdmin implements Admin { Bytes.compareTo(hri.getStartKey(), splitPoint) == 0) { throw new IOException("should not give a splitkey which equals to startkey!"); } - // TODO: This is not executed via retries + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + controller.setPriority(hri.getTable()); + + // TODO: this does not do retries, it should. Set priority and timeout in controller AdminService.BlockingInterface admin = this.connection.getAdmin(sn); - ProtobufUtil.split(admin, hri, splitPoint); + ProtobufUtil.split(controller, admin, hri, splitPoint); } @Override public Future modifyTable(final TableName tableName, final HTableDescriptor htd) - throws IOException { + throws IOException { if (!tableName.equals(htd.getTableName())) { throw new IllegalArgumentException("the specified table name '" + tableName + "' doesn't match with the HTD one: " + htd.getTableName()); } ModifyTableResponse response = executeCallable( - new MasterCallable(getConnection()) { - @Override - public ModifyTableResponse call(int callTimeout) throws ServiceException { - ModifyTableRequest request = RequestConverter.buildModifyTableRequest( - tableName, htd, ng.getNonceGroup(), ng.newNonce()); - return master.modifyTable(null, request); - } - }); + new MasterCallable(getConnection()) { + @Override + public ModifyTableResponse call(int callTimeout) throws ServiceException { + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + controller.setCallTimeout(callTimeout); + controller.setPriority(tableName); + + ModifyTableRequest request = RequestConverter.buildModifyTableRequest( + tableName, htd, ng.getNonceGroup(), ng.newNonce()); + return master.modifyTable(controller, request); + } + }); return new ModifyTableFuture(this, tableName, response); } @@ -1715,7 +1837,10 @@ public class HBaseAdmin implements Admin { executeCallable(new MasterCallable(getConnection()) { @Override public Void call(int callTimeout) throws ServiceException { - master.shutdown(null,ShutdownRequest.newBuilder().build()); + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + controller.setCallTimeout(callTimeout); + controller.setPriority(HConstants.HIGH_QOS); + master.shutdown(controller, ShutdownRequest.newBuilder().build()); return null; } }); @@ -1726,7 +1851,10 @@ public class HBaseAdmin implements Admin { executeCallable(new MasterCallable(getConnection()) { @Override public Void call(int callTimeout) throws ServiceException { - master.stopMaster(null, StopMasterRequest.newBuilder().build()); + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + controller.setCallTimeout(callTimeout); + controller.setPriority(HConstants.HIGH_QOS); + master.stopMaster(controller, StopMasterRequest.newBuilder().build()); return null; } }); @@ -1741,8 +1869,12 @@ public class HBaseAdmin implements Admin { this.connection.getAdmin(ServerName.valueOf(hostname, port, 0)); StopServerRequest request = RequestConverter.buildStopServerRequest( "Called by admin client " + this.connection.toString()); + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + + controller.setPriority(HConstants.HIGH_QOS); try { - admin.stopServer(null, request); + // TODO: this does not do retries, it should. Set priority and timeout in controller + admin.stopServer(controller, request); } catch (ServiceException se) { throw ProtobufUtil.getRemoteException(se); } @@ -1753,8 +1885,10 @@ public class HBaseAdmin implements Admin { return executeCallable(new MasterCallable(getConnection()) { @Override public ClusterStatus call(int callTimeout) throws ServiceException { + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + controller.setCallTimeout(callTimeout); GetClusterStatusRequest req = RequestConverter.buildGetClusterStatusRequest(); - return ClusterStatus.convert(master.getClusterStatus(null, req).getClusterStatus()); + return ClusterStatus.convert(master.getClusterStatus(controller, req).getClusterStatus()); } }); } @@ -1793,18 +1927,21 @@ public class HBaseAdmin implements Admin { @Override public Future createNamespaceAsync(final NamespaceDescriptor descriptor) - throws IOException { + throws IOException { CreateNamespaceResponse response = executeCallable(new MasterCallable(getConnection()) { - @Override - public CreateNamespaceResponse call(int callTimeout) throws Exception { - return master.createNamespace(null, - CreateNamespaceRequest.newBuilder() - .setNamespaceDescriptor(ProtobufUtil - .toProtoNamespaceDescriptor(descriptor)).build() - ); - } - }); + @Override + public CreateNamespaceResponse call(int callTimeout) throws Exception { + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + controller.setCallTimeout(callTimeout); + // TODO: set priority based on NS? + return master.createNamespace(controller, + CreateNamespaceRequest.newBuilder() + .setNamespaceDescriptor(ProtobufUtil + .toProtoNamespaceDescriptor(descriptor)).build() + ); + } + }); return new NamespaceFuture(this, descriptor.getName(), response.getProcId()) { @Override public String getOperationType() { @@ -1821,15 +1958,18 @@ public class HBaseAdmin implements Admin { @Override public Future modifyNamespaceAsync(final NamespaceDescriptor descriptor) - throws IOException { + throws IOException { ModifyNamespaceResponse response = executeCallable(new MasterCallable(getConnection()) { - @Override - public ModifyNamespaceResponse call(int callTimeout) throws Exception { - return master.modifyNamespace(null, ModifyNamespaceRequest.newBuilder(). - setNamespaceDescriptor(ProtobufUtil.toProtoNamespaceDescriptor(descriptor)).build()); - } - }); + @Override + public ModifyNamespaceResponse call(int callTimeout) throws Exception { + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + controller.setCallTimeout(callTimeout); + // TODO: set priority based on NS? + return master.modifyNamespace(controller, ModifyNamespaceRequest.newBuilder(). + setNamespaceDescriptor(ProtobufUtil.toProtoNamespaceDescriptor(descriptor)).build()); + } + }); return new NamespaceFuture(this, descriptor.getName(), response.getProcId()) { @Override public String getOperationType() { @@ -1846,15 +1986,18 @@ public class HBaseAdmin implements Admin { @Override public Future deleteNamespaceAsync(final String name) - throws IOException { + throws IOException { DeleteNamespaceResponse response = executeCallable(new MasterCallable(getConnection()) { - @Override - public DeleteNamespaceResponse call(int callTimeout) throws Exception { - return master.deleteNamespace(null, DeleteNamespaceRequest.newBuilder(). - setNamespaceName(name).build()); - } - }); + @Override + public DeleteNamespaceResponse call(int callTimeout) throws Exception { + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + controller.setCallTimeout(callTimeout); + // TODO: set priority based on NS? + return master.deleteNamespace(controller, DeleteNamespaceRequest.newBuilder(). + setNamespaceName(name).build()); + } + }); return new NamespaceFuture(this, name, response.getProcId()) { @Override public String getOperationType() { @@ -1869,8 +2012,10 @@ public class HBaseAdmin implements Admin { executeCallable(new MasterCallable(getConnection()) { @Override public NamespaceDescriptor call(int callTimeout) throws Exception { + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + controller.setCallTimeout(callTimeout); return ProtobufUtil.toNamespaceDescriptor( - master.getNamespaceDescriptor(null, GetNamespaceDescriptorRequest.newBuilder(). + master.getNamespaceDescriptor(controller, GetNamespaceDescriptorRequest.newBuilder(). setNamespaceName(name).build()).getNamespaceDescriptor()); } }); @@ -1882,9 +2027,12 @@ public class HBaseAdmin implements Admin { executeCallable(new MasterCallable(getConnection()) { @Override public NamespaceDescriptor[] call(int callTimeout) throws Exception { + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + controller.setCallTimeout(callTimeout); List list = - master.listNamespaceDescriptors(null, ListNamespaceDescriptorsRequest.newBuilder(). - build()).getNamespaceDescriptorList(); + master.listNamespaceDescriptors(controller, + ListNamespaceDescriptorsRequest.newBuilder().build()) + .getNamespaceDescriptorList(); NamespaceDescriptor[] res = new NamespaceDescriptor[list.size()]; for(int i = 0; i < list.size(); i++) { res[i] = ProtobufUtil.toNamespaceDescriptor(list.get(i)); @@ -1900,8 +2048,10 @@ public class HBaseAdmin implements Admin { executeCallable(new MasterCallable(getConnection()) { @Override public ProcedureInfo[] call(int callTimeout) throws Exception { + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + controller.setCallTimeout(callTimeout); List procList = master.listProcedures( - null, ListProceduresRequest.newBuilder().build()).getProcedureList(); + controller, ListProceduresRequest.newBuilder().build()).getProcedureList(); ProcedureInfo[] procInfoList = new ProcedureInfo[procList.size()]; for (int i = 0; i < procList.size(); i++) { procInfoList[i] = ProcedureInfo.convert(procList.get(i)); @@ -1917,9 +2067,12 @@ public class HBaseAdmin implements Admin { executeCallable(new MasterCallable(getConnection()) { @Override public HTableDescriptor[] call(int callTimeout) throws Exception { + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + controller.setCallTimeout(callTimeout); List list = - master.listTableDescriptorsByNamespace(null, ListTableDescriptorsByNamespaceRequest. - newBuilder().setNamespaceName(name).build()).getTableSchemaList(); + master.listTableDescriptorsByNamespace(controller, + ListTableDescriptorsByNamespaceRequest.newBuilder().setNamespaceName(name) + .build()).getTableSchemaList(); HTableDescriptor[] res = new HTableDescriptor[list.size()]; for(int i=0; i < list.size(); i++) { @@ -1936,8 +2089,10 @@ public class HBaseAdmin implements Admin { executeCallable(new MasterCallable(getConnection()) { @Override public TableName[] call(int callTimeout) throws Exception { + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + controller.setCallTimeout(callTimeout); List tableNames = - master.listTableNamesByNamespace(null, ListTableNamesByNamespaceRequest. + master.listTableNamesByNamespace(controller, ListTableNamesByNamespaceRequest. newBuilder().setNamespaceName(name).build()) .getTableNameList(); TableName[] result = new TableName[tableNames.size()]; @@ -2017,9 +2172,11 @@ public class HBaseAdmin implements Admin { return executeCallable(new MasterCallable(getConnection()) { @Override public HTableDescriptor[] call(int callTimeout) throws Exception { + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + controller.setCallTimeout(callTimeout); GetTableDescriptorsRequest req = RequestConverter.buildGetTableDescriptorsRequest(tableNames); - return ProtobufUtil.getHTableDescriptorArray(master.getTableDescriptors(null, req)); + return ProtobufUtil.getHTableDescriptorArray(master.getTableDescriptors(controller, req)); } }); } @@ -2059,8 +2216,11 @@ public class HBaseAdmin implements Admin { FailedLogCloseException { AdminService.BlockingInterface admin = this.connection.getAdmin(sn); RollWALWriterRequest request = RequestConverter.buildRollWALWriterRequest(); + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + try { - return admin.rollWALWriter(null, request); + // TODO: this does not do retries, it should. Set priority and timeout in controller + return admin.rollWALWriter(controller, request); } catch (ServiceException se) { throw ProtobufUtil.getRemoteException(se); } @@ -2142,7 +2302,9 @@ public class HBaseAdmin implements Admin { AdminService.BlockingInterface admin = this.connection.getAdmin(sn); GetRegionInfoRequest request = RequestConverter.buildGetRegionInfoRequest( regionServerPair.getFirst().getRegionName(), true); - GetRegionInfoResponse response = admin.getRegionInfo(null, request); + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + // TODO: this does not do retries, it should. Set priority and timeout in controller + GetRegionInfoResponse response = admin.getRegionInfo(controller, request); return response.getCompactionState(); } catch (ServiceException se) { throw ProtobufUtil.getRemoteException(se); @@ -2204,7 +2366,9 @@ public class HBaseAdmin implements Admin { done = executeCallable(new MasterCallable(getConnection()) { @Override public IsSnapshotDoneResponse call(int callTimeout) throws ServiceException { - return master.isSnapshotDone(null, request); + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + controller.setCallTimeout(callTimeout); + return master.isSnapshotDone(controller, request); } }); } @@ -2224,7 +2388,9 @@ public class HBaseAdmin implements Admin { return executeCallable(new MasterCallable(getConnection()) { @Override public SnapshotResponse call(int callTimeout) throws ServiceException { - return master.snapshot(null, request); + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + controller.setCallTimeout(callTimeout); + return master.snapshot(controller, request); } }); } @@ -2236,7 +2402,9 @@ public class HBaseAdmin implements Admin { return executeCallable(new MasterCallable(getConnection()) { @Override public IsSnapshotDoneResponse call(int callTimeout) throws ServiceException { - return master.isSnapshotDone(null, + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + controller.setCallTimeout(callTimeout); + return master.isSnapshotDone(controller, IsSnapshotDoneRequest.newBuilder().setSnapshot(snapshot).build()); } }).getDone(); @@ -2370,7 +2538,9 @@ public class HBaseAdmin implements Admin { getConnection()) { @Override public ExecProcedureResponse call(int callTimeout) throws ServiceException { - return master.execProcedureWithRet(null, request); + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + controller.setCallTimeout(callTimeout); + return master.execProcedureWithRet(controller, request); } }); @@ -2395,7 +2565,9 @@ public class HBaseAdmin implements Admin { getConnection()) { @Override public ExecProcedureResponse call(int callTimeout) throws ServiceException { - return master.execProcedure(null, request); + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + controller.setCallTimeout(callTimeout); + return master.execProcedure(controller, request); } }); @@ -2442,7 +2614,9 @@ public class HBaseAdmin implements Admin { new MasterCallable(getConnection()) { @Override public IsProcedureDoneResponse call(int callTimeout) throws ServiceException { - return master.isProcedureDone(null, IsProcedureDoneRequest + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + controller.setCallTimeout(callTimeout); + return master.isProcedureDone(controller, IsProcedureDoneRequest .newBuilder().setProcedure(desc).build()); } }).getDone(); @@ -2488,7 +2662,9 @@ public class HBaseAdmin implements Admin { getConnection()) { @Override public IsRestoreSnapshotDoneResponse call(int callTimeout) throws ServiceException { - return master.isRestoreSnapshotDone(null, request); + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + controller.setCallTimeout(callTimeout); + return master.isRestoreSnapshotDone(controller, request); } }); } @@ -2518,7 +2694,9 @@ public class HBaseAdmin implements Admin { return executeCallable(new MasterCallable(getConnection()) { @Override public RestoreSnapshotResponse call(int callTimeout) throws ServiceException { - return master.restoreSnapshot(null, request); + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + controller.setCallTimeout(callTimeout); + return master.restoreSnapshot(controller, request); } }); } @@ -2528,8 +2706,10 @@ public class HBaseAdmin implements Admin { return executeCallable(new MasterCallable>(getConnection()) { @Override public List call(int callTimeout) throws ServiceException { - return master.getCompletedSnapshots(null, GetCompletedSnapshotsRequest.newBuilder().build()) - .getSnapshotsList(); + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + controller.setCallTimeout(callTimeout); + return master.getCompletedSnapshots(controller, + GetCompletedSnapshotsRequest.newBuilder().build()).getSnapshotsList(); } }); } @@ -2587,7 +2767,9 @@ public class HBaseAdmin implements Admin { executeCallable(new MasterCallable(getConnection()) { @Override public Void call(int callTimeout) throws ServiceException { - master.deleteSnapshot(null, + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + controller.setCallTimeout(callTimeout); + master.deleteSnapshot(controller, DeleteSnapshotRequest.newBuilder(). setSnapshot(SnapshotDescription.newBuilder().setName(snapshotName).build()).build() ); @@ -2619,8 +2801,10 @@ public class HBaseAdmin implements Admin { executeCallable(new MasterCallable(getConnection()) { @Override public Void call(int callTimeout) throws ServiceException { - this.master.deleteSnapshot(null, DeleteSnapshotRequest.newBuilder().setSnapshot(snapshot) - .build()); + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + controller.setCallTimeout(callTimeout); + this.master.deleteSnapshot(controller, DeleteSnapshotRequest.newBuilder() + .setSnapshot(snapshot).build()); return null; } }); @@ -2651,7 +2835,9 @@ public class HBaseAdmin implements Admin { executeCallable(new MasterCallable(getConnection()) { @Override public Void call(int callTimeout) throws ServiceException { - this.master.setQuota(null, QuotaSettings.buildSetQuotaRequestProto(quota)); + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + controller.setCallTimeout(callTimeout); + this.master.setQuota(controller, QuotaSettings.buildSetQuotaRequestProto(quota)); return null; } }); @@ -2750,10 +2936,12 @@ public class HBaseAdmin implements Admin { return executeCallable(new MasterCallable(getConnection()) { @Override public Long call(int callTimeout) throws ServiceException { + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + controller.setCallTimeout(callTimeout); MajorCompactionTimestampRequest req = MajorCompactionTimestampRequest.newBuilder() .setTableName(ProtobufUtil.toProtoTableName(tableName)).build(); - return master.getLastMajorCompactionTimestamp(null, req).getCompactionTimestamp(); + return master.getLastMajorCompactionTimestamp(controller, req).getCompactionTimestamp(); } }); } @@ -2763,13 +2951,16 @@ public class HBaseAdmin implements Admin { return executeCallable(new MasterCallable(getConnection()) { @Override public Long call(int callTimeout) throws ServiceException { + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + controller.setCallTimeout(callTimeout); MajorCompactionTimestampForRegionRequest req = MajorCompactionTimestampForRegionRequest .newBuilder() .setRegion( RequestConverter .buildRegionSpecifier(RegionSpecifierType.REGION_NAME, regionName)).build(); - return master.getLastMajorCompactionTimestampForRegion(null, req).getCompactionTimestamp(); + return master.getLastMajorCompactionTimestampForRegion(controller, req) + .getCompactionTimestamp(); } }); } @@ -2818,6 +3009,7 @@ public class HBaseAdmin implements Admin { CompactType compactType) throws IOException { CompactionState state = CompactionState.NONE; checkTableExists(tableName); + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); switch (compactType) { case MOB: try { @@ -2826,7 +3018,7 @@ public class HBaseAdmin implements Admin { GetRegionInfoRequest request = RequestConverter.buildGetRegionInfoRequest( info.getRegionName(), true); GetRegionInfoResponse response = this.connection.getAdmin(master) - .getRegionInfo(null, request); + .getRegionInfo(controller, request); state = response.getCompactionState(); } catch (ServiceException se) { throw ProtobufUtil.getRemoteException(se); @@ -2852,7 +3044,7 @@ public class HBaseAdmin implements Admin { AdminService.BlockingInterface admin = this.connection.getAdmin(sn); GetRegionInfoRequest request = RequestConverter.buildGetRegionInfoRequest( pair.getFirst().getRegionName(), true); - GetRegionInfoResponse response = admin.getRegionInfo(null, request); + GetRegionInfoResponse response = admin.getRegionInfo(controller, request); switch (response.getCompactionState()) { case MAJOR_AND_MINOR: return CompactionState.MAJOR_AND_MINOR; @@ -2952,7 +3144,9 @@ public class HBaseAdmin implements Admin { admin.getConnection()) { @Override public AbortProcedureResponse call(int callTimeout) throws ServiceException { - return master.abortProcedure(null, request); + PayloadCarryingRpcController controller = admin.getRpcControllerFactory().newController(); + controller.setCallTimeout(callTimeout); + return master.abortProcedure(controller, request); } }); } @@ -3366,9 +3560,11 @@ public class HBaseAdmin implements Admin { return executeCallable(new MasterCallable>(getConnection()) { @Override public List call(int callTimeout) throws ServiceException { + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + controller.setCallTimeout(callTimeout); SecurityCapabilitiesRequest req = SecurityCapabilitiesRequest.newBuilder().build(); return ProtobufUtil.toSecurityCapabilityList( - master.getSecurityCapabilities(null, req).getCapabilitiesList()); + master.getSecurityCapabilities(controller, req).getCapabilitiesList()); } }); } catch (IOException e) { @@ -3414,4 +3610,7 @@ public class HBaseAdmin implements Admin { HConstants.EMPTY_END_ROW, false, 0); } + private RpcControllerFactory getRpcControllerFactory() { + return rpcControllerFactory; + } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java index 33fd94e9deb..befc6714cb0 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java @@ -105,7 +105,7 @@ public class HTable implements HTableInterface { protected ClusterConnection connection; private final TableName tableName; private volatile Configuration configuration; - private TableConfiguration tableConfiguration; + private ConnectionConfiguration connConfiguration; protected BufferedMutatorImpl mutator; private boolean autoFlush = true; private boolean closed = false; @@ -154,7 +154,7 @@ public class HTable implements HTableInterface { */ @InterfaceAudience.Private protected HTable(TableName tableName, final ClusterConnection connection, - final TableConfiguration tableConfig, + final ConnectionConfiguration tableConfig, final RpcRetryingCallerFactory rpcCallerFactory, final RpcControllerFactory rpcControllerFactory, final ExecutorService pool) throws IOException { @@ -165,7 +165,7 @@ public class HTable implements HTableInterface { this.cleanupConnectionOnClose = false; this.connection = connection; this.configuration = connection.getConfiguration(); - this.tableConfiguration = tableConfig; + this.connConfiguration = tableConfig; this.pool = pool; if (pool == null) { this.pool = getDefaultExecutor(this.configuration); @@ -188,7 +188,7 @@ public class HTable implements HTableInterface { protected HTable(ClusterConnection conn, BufferedMutatorParams params) throws IOException { connection = conn; tableName = params.getTableName(); - tableConfiguration = new TableConfiguration(connection.getConfiguration()); + connConfiguration = new ConnectionConfiguration(connection.getConfiguration()); cleanupPoolOnClose = false; cleanupConnectionOnClose = false; // used from tests, don't trust the connection is real @@ -206,14 +206,14 @@ public class HTable implements HTableInterface { * setup this HTable's parameter based on the passed configuration */ private void finishSetup() throws IOException { - if (tableConfiguration == null) { - tableConfiguration = new TableConfiguration(configuration); + if (connConfiguration == null) { + connConfiguration = new ConnectionConfiguration(configuration); } this.operationTimeout = tableName.isSystemTable() ? - tableConfiguration.getMetaOperationTimeout() : tableConfiguration.getOperationTimeout(); - this.scannerCaching = tableConfiguration.getScannerCaching(); - this.scannerMaxResultSize = tableConfiguration.getScannerMaxResultSize(); + connConfiguration.getMetaOperationTimeout() : connConfiguration.getOperationTimeout(); + this.scannerCaching = connConfiguration.getScannerCaching(); + this.scannerMaxResultSize = connConfiguration.getScannerMaxResultSize(); if (this.rpcCallerFactory == null) { this.rpcCallerFactory = connection.getNewRpcRetryingCallerFactory(configuration); } @@ -265,23 +265,14 @@ public class HTable implements HTableInterface { */ @Override public HTableDescriptor getTableDescriptor() throws IOException { - HTableDescriptor htd = HBaseAdmin.getTableDescriptor(tableName, connection, - rpcCallerFactory, operationTimeout); + HTableDescriptor htd = HBaseAdmin.getTableDescriptor(tableName, connection, rpcCallerFactory, + rpcControllerFactory, operationTimeout); if (htd != null) { return new UnmodifyableHTableDescriptor(htd); } return null; } - private V executeMasterCallable(MasterCallable callable) throws IOException { - RpcRetryingCaller caller = rpcCallerFactory.newCaller(); - try { - return caller.callWithRetries(callable, operationTimeout); - } finally { - callable.close(); - } - } - /** * Get the corresponding start keys and regions for an arbitrary range of * keys. @@ -354,34 +345,34 @@ public class HTable implements HTableInterface { Boolean async = scan.isAsyncPrefetch(); if (async == null) { - async = tableConfiguration.isClientScannerAsyncPrefetch(); + async = connConfiguration.isClientScannerAsyncPrefetch(); } if (scan.isReversed()) { if (scan.isSmall()) { return new ClientSmallReversedScanner(getConfiguration(), scan, getName(), this.connection, this.rpcCallerFactory, this.rpcControllerFactory, - pool, tableConfiguration.getReplicaCallTimeoutMicroSecondScan()); + pool, connConfiguration.getReplicaCallTimeoutMicroSecondScan()); } else { return new ReversedClientScanner(getConfiguration(), scan, getName(), this.connection, this.rpcCallerFactory, this.rpcControllerFactory, - pool, tableConfiguration.getReplicaCallTimeoutMicroSecondScan()); + pool, connConfiguration.getReplicaCallTimeoutMicroSecondScan()); } } if (scan.isSmall()) { return new ClientSmallScanner(getConfiguration(), scan, getName(), this.connection, this.rpcCallerFactory, this.rpcControllerFactory, - pool, tableConfiguration.getReplicaCallTimeoutMicroSecondScan()); + pool, connConfiguration.getReplicaCallTimeoutMicroSecondScan()); } else { if (async) { return new ClientAsyncPrefetchScanner(getConfiguration(), scan, getName(), this.connection, this.rpcCallerFactory, this.rpcControllerFactory, - pool, tableConfiguration.getReplicaCallTimeoutMicroSecondScan()); + pool, connConfiguration.getReplicaCallTimeoutMicroSecondScan()); } else { return new ClientSimpleScanner(getConfiguration(), scan, getName(), this.connection, this.rpcCallerFactory, this.rpcControllerFactory, - pool, tableConfiguration.getReplicaCallTimeoutMicroSecondScan()); + pool, connConfiguration.getReplicaCallTimeoutMicroSecondScan()); } } } @@ -454,9 +445,9 @@ public class HTable implements HTableInterface { // Call that takes into account the replica RpcRetryingCallerWithReadReplicas callable = new RpcRetryingCallerWithReadReplicas( rpcControllerFactory, tableName, this.connection, get, pool, - tableConfiguration.getRetriesNumber(), + connConfiguration.getRetriesNumber(), operationTimeout, - tableConfiguration.getPrimaryCallTimeoutMicroSecond()); + connConfiguration.getPrimaryCallTimeoutMicroSecond()); return callable.call(); } @@ -1039,7 +1030,7 @@ public class HTable implements HTableInterface { // validate for well-formedness public void validatePut(final Put put) throws IllegalArgumentException { - validatePut(put, tableConfiguration.getMaxKeyValueSize()); + validatePut(put, connConfiguration.getMaxKeyValueSize()); } // validate for well-formedness @@ -1092,7 +1083,7 @@ public class HTable implements HTableInterface { @Override public long getWriteBufferSize() { if (mutator == null) { - return tableConfiguration.getWriteBufferSize(); + return connConfiguration.getWriteBufferSize(); } else { return mutator.getWriteBufferSize(); } @@ -1344,8 +1335,8 @@ public class HTable implements HTableInterface { this.mutator = (BufferedMutatorImpl) connection.getBufferedMutator( new BufferedMutatorParams(tableName) .pool(pool) - .writeBufferSize(tableConfiguration.getWriteBufferSize()) - .maxKeyValueSize(tableConfiguration.getMaxKeyValueSize()) + .writeBufferSize(connConfiguration.getWriteBufferSize()) + .maxKeyValueSize(connConfiguration.getMaxKeyValueSize()) ); } return mutator; diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java index a53fb707082..ec6332a5e12 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java @@ -318,7 +318,7 @@ public abstract class AbstractRpcClient implements RpcClient { public Message callBlockingMethod(Descriptors.MethodDescriptor md, RpcController controller, Message param, Message returnType) throws ServiceException { PayloadCarryingRpcController pcrc; - if (controller != null) { + if (controller != null && controller instanceof PayloadCarryingRpcController) { pcrc = (PayloadCarryingRpcController) controller; if (!pcrc.hasCallTimeout()) { pcrc.setCallTimeout(channelOperationTimeout); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/CoprocessorRpcChannel.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/CoprocessorRpcChannel.java index e60fbd6ba04..b1d54a4b352 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/CoprocessorRpcChannel.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/CoprocessorRpcChannel.java @@ -36,7 +36,7 @@ import com.google.protobuf.ServiceException; /** * Base class which provides clients with an RPC connection to - * call coprocessor endpoint {@link com.google.protobuf.Service}s. + * call coprocessor endpoint {@link com.google.protobuf.Service}s. * Note that clients should not use this class directly, except through * {@link org.apache.hadoop.hbase.client.HTableInterface#coprocessorService(byte[])}. */ @@ -53,7 +53,7 @@ public abstract class CoprocessorRpcChannel implements RpcChannel, BlockingRpcCh RpcCallback callback) { Message response = null; try { - response = callExecService(method, request, responsePrototype); + response = callExecService(controller, method, request, responsePrototype); } catch (IOException ioe) { LOG.warn("Call failed on IOException", ioe); ResponseConverter.setControllerException(controller, ioe); @@ -70,12 +70,13 @@ public abstract class CoprocessorRpcChannel implements RpcChannel, BlockingRpcCh Message request, Message responsePrototype) throws ServiceException { try { - return callExecService(method, request, responsePrototype); + return callExecService(controller, method, request, responsePrototype); } catch (IOException ioe) { throw new ServiceException("Error calling method "+method.getFullName(), ioe); } } - protected abstract Message callExecService(Descriptors.MethodDescriptor method, - Message request, Message responsePrototype) throws IOException; + protected abstract Message callExecService(RpcController controller, + Descriptors.MethodDescriptor method, Message request, Message responsePrototype) + throws IOException; } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/MasterCoprocessorRpcChannel.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/MasterCoprocessorRpcChannel.java index 98a74eff11a..6e59972f88f 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/MasterCoprocessorRpcChannel.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/MasterCoprocessorRpcChannel.java @@ -24,7 +24,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.client.HConnection; +import org.apache.hadoop.hbase.client.ClusterConnection; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceResponse; @@ -32,6 +32,7 @@ import org.apache.hadoop.hbase.util.ByteStringer; import com.google.protobuf.Descriptors; import com.google.protobuf.Message; +import com.google.protobuf.RpcController; /** * Provides clients with an RPC connection to call coprocessor endpoint {@link com.google.protobuf.Service}s @@ -45,18 +46,18 @@ import com.google.protobuf.Message; public class MasterCoprocessorRpcChannel extends CoprocessorRpcChannel{ private static final Log LOG = LogFactory.getLog(MasterCoprocessorRpcChannel.class); - private final HConnection connection; + private final ClusterConnection connection; - public MasterCoprocessorRpcChannel(HConnection conn) { + public MasterCoprocessorRpcChannel(ClusterConnection conn) { this.connection = conn; } @Override - protected Message callExecService(Descriptors.MethodDescriptor method, + protected Message callExecService(RpcController controller, Descriptors.MethodDescriptor method, Message request, Message responsePrototype) throws IOException { - if (LOG.isDebugEnabled()) { - LOG.debug("Call: "+method.getName()+", "+request.toString()); + if (LOG.isTraceEnabled()) { + LOG.trace("Call: "+method.getName()+", "+request.toString()); } final ClientProtos.CoprocessorServiceCall call = @@ -65,7 +66,10 @@ public class MasterCoprocessorRpcChannel extends CoprocessorRpcChannel{ .setServiceName(method.getService().getFullName()) .setMethodName(method.getName()) .setRequest(request.toByteString()).build(); - CoprocessorServiceResponse result = ProtobufUtil.execService(connection.getMaster(), call); + + // TODO: Are we retrying here? Does not seem so. We should use RetryingRpcCaller + CoprocessorServiceResponse result = ProtobufUtil.execService(controller, + connection.getMaster(), call); Message response = null; if (result.getValue().hasValue()) { Message.Builder builder = responsePrototype.newBuilderForType(); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RegionCoprocessorRpcChannel.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RegionCoprocessorRpcChannel.java index 3fcfcebe6d7..321dd6274b3 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RegionCoprocessorRpcChannel.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RegionCoprocessorRpcChannel.java @@ -22,10 +22,9 @@ import java.io.IOException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.client.HConnection; +import org.apache.hadoop.hbase.client.ClusterConnection; import org.apache.hadoop.hbase.client.RegionServerCallable; import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; @@ -36,6 +35,7 @@ import org.apache.hadoop.hbase.util.Bytes; import com.google.protobuf.Descriptors; import com.google.protobuf.Message; +import com.google.protobuf.RpcController; /** * Provides clients with an RPC connection to call coprocessor endpoint {@link com.google.protobuf.Service}s @@ -49,28 +49,28 @@ import com.google.protobuf.Message; public class RegionCoprocessorRpcChannel extends CoprocessorRpcChannel{ private static final Log LOG = LogFactory.getLog(RegionCoprocessorRpcChannel.class); - private final HConnection connection; + private final ClusterConnection connection; private final TableName table; private final byte[] row; private byte[] lastRegion; private int operationTimeout; - private RpcRetryingCallerFactory rpcFactory; + private RpcRetryingCallerFactory rpcCallerFactory; + private RpcControllerFactory rpcControllerFactory; - public RegionCoprocessorRpcChannel(HConnection conn, TableName table, byte[] row) { + public RegionCoprocessorRpcChannel(ClusterConnection conn, TableName table, byte[] row) { this.connection = conn; this.table = table; this.row = row; - this.rpcFactory = RpcRetryingCallerFactory.instantiate(conn.getConfiguration(), null); - this.operationTimeout = conn.getConfiguration().getInt( - HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, - HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT); + this.rpcCallerFactory = conn.getRpcRetryingCallerFactory(); + this.rpcControllerFactory = conn.getRpcControllerFactory(); + this.operationTimeout = conn.getConnectionConfiguration().getOperationTimeout(); } @Override - protected Message callExecService(Descriptors.MethodDescriptor method, - Message request, Message responsePrototype) - throws IOException { + protected Message callExecService(RpcController controller, + Descriptors.MethodDescriptor method, Message request, Message responsePrototype) + throws IOException { if (LOG.isTraceEnabled()) { LOG.trace("Call: "+method.getName()+", "+request.toString()); } @@ -79,6 +79,9 @@ public class RegionCoprocessorRpcChannel extends CoprocessorRpcChannel{ throw new IllegalArgumentException("Missing row property for remote region location"); } + final RpcController rpcController = controller == null + ? rpcControllerFactory.newController() : controller; + final ClientProtos.CoprocessorServiceCall call = ClientProtos.CoprocessorServiceCall.newBuilder() .setRow(ByteStringer.wrap(row)) @@ -87,12 +90,19 @@ public class RegionCoprocessorRpcChannel extends CoprocessorRpcChannel{ .setRequest(request.toByteString()).build(); RegionServerCallable callable = new RegionServerCallable(connection, table, row) { - public CoprocessorServiceResponse call(int callTimeout) throws Exception { - byte[] regionName = getLocation().getRegionInfo().getRegionName(); - return ProtobufUtil.execService(getStub(), call, regionName); - } - }; - CoprocessorServiceResponse result = rpcFactory. newCaller() + @Override + public CoprocessorServiceResponse call(int callTimeout) throws Exception { + if (rpcController instanceof PayloadCarryingRpcController) { + ((PayloadCarryingRpcController) rpcController).setPriority(tableName); + } + if (rpcController instanceof TimeLimitedRpcController) { + ((TimeLimitedRpcController) rpcController).setCallTimeout(callTimeout); + } + byte[] regionName = getLocation().getRegionInfo().getRegionName(); + return ProtobufUtil.execService(rpcController, getStub(), call, regionName); + } + }; + CoprocessorServiceResponse result = rpcCallerFactory. newCaller() .callWithRetries(callable, operationTimeout); Message response = null; if (result.getValue().hasValue()) { diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RegionServerCoprocessorRpcChannel.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RegionServerCoprocessorRpcChannel.java index 3f0a5d99803..24d2de4aa9c 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RegionServerCoprocessorRpcChannel.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RegionServerCoprocessorRpcChannel.java @@ -26,6 +26,7 @@ import org.apache.hadoop.hbase.util.ByteStringer; import com.google.protobuf.Descriptors; import com.google.protobuf.Message; +import com.google.protobuf.RpcController; /** * Provides clients with an RPC connection to call coprocessor endpoint @@ -47,8 +48,9 @@ public class RegionServerCoprocessorRpcChannel extends CoprocessorRpcChannel { } @Override - protected Message callExecService(Descriptors.MethodDescriptor method, Message request, - Message responsePrototype) throws IOException { + protected Message callExecService(RpcController controller, + Descriptors.MethodDescriptor method, Message request, Message responsePrototype) + throws IOException { if (LOG.isTraceEnabled()) { LOG.trace("Call: " + method.getName() + ", " + request.toString()); } @@ -57,8 +59,10 @@ public class RegionServerCoprocessorRpcChannel extends CoprocessorRpcChannel { .setRow(ByteStringer.wrap(HConstants.EMPTY_BYTE_ARRAY)) .setServiceName(method.getService().getFullName()).setMethodName(method.getName()) .setRequest(request.toByteString()).build(); + + // TODO: Are we retrying here? Does not seem so. We should use RetryingRpcCaller CoprocessorServiceResponse result = - ProtobufUtil.execRegionServerService(connection.getClient(serverName), call); + ProtobufUtil.execRegionServerService(controller, connection.getClient(serverName), call); Message response = null; if (result.getValue().hasValue()) { Message.Builder builder = responsePrototype.newBuilderForType(); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java index 263677756b1..f9fa21c85dd 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java @@ -173,6 +173,19 @@ import org.apache.hadoop.security.token.Token; import static org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier .RegionSpecifierType.REGION_NAME; +import com.google.common.collect.ArrayListMultimap; +import com.google.common.collect.ListMultimap; +import com.google.common.collect.Lists; +import com.google.protobuf.ByteString; +import com.google.protobuf.CodedInputStream; +import com.google.protobuf.InvalidProtocolBufferException; +import com.google.protobuf.Message; +import com.google.protobuf.Parser; +import com.google.protobuf.RpcChannel; +import com.google.protobuf.RpcController; +import com.google.protobuf.Service; +import com.google.protobuf.ServiceException; +import com.google.protobuf.TextFormat; /** * Protobufs utility. @@ -1575,21 +1588,22 @@ public final class ProtobufUtil { } } - public static CoprocessorServiceResponse execService(final ClientService.BlockingInterface client, - final CoprocessorServiceCall call, final byte[] regionName) throws IOException { + public static CoprocessorServiceResponse execService(final RpcController controller, + final ClientService.BlockingInterface client, final CoprocessorServiceCall call, + final byte[] regionName) throws IOException { CoprocessorServiceRequest request = CoprocessorServiceRequest.newBuilder() .setCall(call).setRegion( RequestConverter.buildRegionSpecifier(REGION_NAME, regionName)).build(); try { CoprocessorServiceResponse response = - client.execService(null, request); + client.execService(controller, request); return response; } catch (ServiceException se) { throw getRemoteException(se); } } - public static CoprocessorServiceResponse execService( + public static CoprocessorServiceResponse execService(final RpcController controller, final MasterService.BlockingInterface client, final CoprocessorServiceCall call) throws IOException { CoprocessorServiceRequest request = CoprocessorServiceRequest.newBuilder() @@ -1597,7 +1611,7 @@ public final class ProtobufUtil { RequestConverter.buildRegionSpecifier(REGION_NAME, HConstants.EMPTY_BYTE_ARRAY)).build(); try { CoprocessorServiceResponse response = - client.execMasterService(null, request); + client.execMasterService(controller, request); return response; } catch (ServiceException se) { throw getRemoteException(se); @@ -1612,7 +1626,8 @@ public final class ProtobufUtil { * @throws IOException */ public static CoprocessorServiceResponse execRegionServerService( - final ClientService.BlockingInterface client, final CoprocessorServiceCall call) + final RpcController controller, final ClientService.BlockingInterface client, + final CoprocessorServiceCall call) throws IOException { CoprocessorServiceRequest request = CoprocessorServiceRequest @@ -1622,7 +1637,7 @@ public final class ProtobufUtil { RequestConverter.buildRegionSpecifier(REGION_NAME, HConstants.EMPTY_BYTE_ARRAY)) .build(); try { - CoprocessorServiceResponse response = client.execRegionServerService(null, request); + CoprocessorServiceResponse response = client.execRegionServerService(controller, request); return response; } catch (ServiceException se) { throw getRemoteException(se); @@ -1648,13 +1663,13 @@ public final class ProtobufUtil { * @return the retrieved region info * @throws IOException */ - public static HRegionInfo getRegionInfo(final AdminService.BlockingInterface admin, - final byte[] regionName) throws IOException { + public static HRegionInfo getRegionInfo(final RpcController controller, + final AdminService.BlockingInterface admin, final byte[] regionName) throws IOException { try { GetRegionInfoRequest request = RequestConverter.buildGetRegionInfoRequest(regionName); GetRegionInfoResponse response = - admin.getRegionInfo(null, request); + admin.getRegionInfo(controller, request); return HRegionInfo.convert(response.getRegionInfo()); } catch (ServiceException se) { throw getRemoteException(se); @@ -1669,12 +1684,13 @@ public final class ProtobufUtil { * @param regionName * @throws IOException */ - public static void closeRegion(final AdminService.BlockingInterface admin, - final ServerName server, final byte[] regionName) throws IOException { + public static void closeRegion(final RpcController controller, + final AdminService.BlockingInterface admin, final ServerName server, final byte[] regionName) + throws IOException { CloseRegionRequest closeRegionRequest = RequestConverter.buildCloseRegionRequest(server, regionName); try { - admin.closeRegion(null, closeRegionRequest); + admin.closeRegion(controller, closeRegionRequest); } catch (ServiceException se) { throw getRemoteException(se); } @@ -1689,14 +1705,15 @@ public final class ProtobufUtil { * @return true if the region is closed * @throws IOException */ - public static boolean closeRegion(final AdminService.BlockingInterface admin, + public static boolean closeRegion(final RpcController controller, + final AdminService.BlockingInterface admin, final ServerName server, final byte[] regionName, final ServerName destinationServer) throws IOException { CloseRegionRequest closeRegionRequest = RequestConverter.buildCloseRegionRequest(server, regionName, destinationServer); try { - CloseRegionResponse response = admin.closeRegion(null, closeRegionRequest); + CloseRegionResponse response = admin.closeRegion(controller, closeRegionRequest); return ResponseConverter.isClosed(response); } catch (ServiceException se) { throw getRemoteException(se); @@ -1711,14 +1728,14 @@ public final class ProtobufUtil { * @param regionInfo * */ - public static void warmupRegion(final AdminService.BlockingInterface admin, - final HRegionInfo regionInfo) throws IOException { + public static void warmupRegion(final RpcController controller, + final AdminService.BlockingInterface admin, final HRegionInfo regionInfo) throws IOException { try { WarmupRegionRequest warmupRegionRequest = RequestConverter.buildWarmupRegionRequest(regionInfo); - admin.warmupRegion(null, warmupRegionRequest); + admin.warmupRegion(controller, warmupRegionRequest); } catch (ServiceException e) { throw getRemoteException(e); } @@ -1730,18 +1747,18 @@ public final class ProtobufUtil { * @param region * @throws IOException */ - public static void openRegion(final AdminService.BlockingInterface admin, - ServerName server, final HRegionInfo region) throws IOException { + public static void openRegion(final RpcController controller, + final AdminService.BlockingInterface admin, ServerName server, final HRegionInfo region) + throws IOException { OpenRegionRequest request = RequestConverter.buildOpenRegionRequest(server, region, null, null); try { - admin.openRegion(null, request); + admin.openRegion(controller, request); } catch (ServiceException se) { throw ProtobufUtil.getRemoteException(se); } } - /** * A helper to get the all the online regions on a region * server using admin protocol. @@ -1751,11 +1768,22 @@ public final class ProtobufUtil { * @throws IOException */ public static List getOnlineRegions(final AdminService.BlockingInterface admin) + throws IOException { + return getOnlineRegions(null, admin); + } + + /** + * A helper to get the all the online regions on a region + * server using admin protocol. + * @return a list of online region info + */ + public static List getOnlineRegions(final RpcController controller, + final AdminService.BlockingInterface admin) throws IOException { GetOnlineRegionRequest request = RequestConverter.buildGetOnlineRegionRequest(); GetOnlineRegionResponse response = null; try { - response = admin.getOnlineRegion(null, request); + response = admin.getOnlineRegion(controller, request); } catch (ServiceException se) { throw getRemoteException(se); } @@ -1779,16 +1807,14 @@ public final class ProtobufUtil { /** * A helper to get the info of a region server using admin protocol. - * - * @param admin * @return the server name - * @throws IOException */ - public static ServerInfo getServerInfo(final AdminService.BlockingInterface admin) + public static ServerInfo getServerInfo(final RpcController controller, + final AdminService.BlockingInterface admin) throws IOException { GetServerInfoRequest request = RequestConverter.buildGetServerInfoRequest(); try { - GetServerInfoResponse response = admin.getServerInfo(null, request); + GetServerInfoResponse response = admin.getServerInfo(controller, request); return response.getServerInfo(); } catch (ServiceException se) { throw getRemoteException(se); @@ -1799,19 +1825,27 @@ public final class ProtobufUtil { * A helper to get the list of files of a column family * on a given region using admin protocol. * - * @param admin - * @param regionName - * @param family * @return the list of store files - * @throws IOException */ public static List getStoreFiles(final AdminService.BlockingInterface admin, final byte[] regionName, final byte[] family) + throws IOException { + return getStoreFiles(null, admin, regionName, family); + } + + /** + * A helper to get the list of files of a column family + * on a given region using admin protocol. + * + * @return the list of store files + */ + public static List getStoreFiles(final RpcController controller, + final AdminService.BlockingInterface admin, final byte[] regionName, final byte[] family) throws IOException { GetStoreFileRequest request = RequestConverter.buildGetStoreFileRequest(regionName, family); try { - GetStoreFileResponse response = admin.getStoreFile(null, request); + GetStoreFileResponse response = admin.getStoreFile(controller, request); return response.getStoreFileList(); } catch (ServiceException se) { throw ProtobufUtil.getRemoteException(se); @@ -1826,12 +1860,13 @@ public final class ProtobufUtil { * @param splitPoint * @throws IOException */ - public static void split(final AdminService.BlockingInterface admin, - final HRegionInfo hri, byte[] splitPoint) throws IOException { + public static void split(final RpcController controller, + final AdminService.BlockingInterface admin, final HRegionInfo hri, byte[] splitPoint) + throws IOException { SplitRegionRequest request = RequestConverter.buildSplitRegionRequest(hri.getRegionName(), splitPoint); try { - admin.splitRegion(null, request); + admin.splitRegion(controller, request); } catch (ServiceException se) { throw ProtobufUtil.getRemoteException(se); } @@ -1848,7 +1883,8 @@ public final class ProtobufUtil { * @param user effective user * @throws IOException */ - public static void mergeRegions(final AdminService.BlockingInterface admin, + public static void mergeRegions(final RpcController controller, + final AdminService.BlockingInterface admin, final HRegionInfo region_a, final HRegionInfo region_b, final boolean forcible, final User user) throws IOException { final MergeRegionsRequest request = RequestConverter.buildMergeRegionsRequest( @@ -1858,7 +1894,7 @@ public final class ProtobufUtil { user.getUGI().doAs(new PrivilegedExceptionAction() { @Override public Void run() throws Exception { - admin.mergeRegions(null, request); + admin.mergeRegions(controller, request); return null; } }); @@ -1869,7 +1905,7 @@ public final class ProtobufUtil { } } else { try { - admin.mergeRegions(null, request); + admin.mergeRegions(controller, request); } catch (ServiceException se) { throw ProtobufUtil.getRemoteException(se); } @@ -2144,8 +2180,9 @@ public final class ProtobufUtil { * @param actions the permissions to be granted * @throws ServiceException */ - public static void grant(AccessControlService.BlockingInterface protocol, - String userShortName, Permission.Action... actions) throws ServiceException { + public static void grant(RpcController controller, + AccessControlService.BlockingInterface protocol, String userShortName, + Permission.Action... actions) throws ServiceException { List permActions = Lists.newArrayListWithCapacity(actions.length); for (Permission.Action a : actions) { @@ -2154,7 +2191,7 @@ public final class ProtobufUtil { AccessControlProtos.GrantRequest request = RequestConverter. buildGrantRequest(userShortName, permActions.toArray( new AccessControlProtos.Permission.Action[actions.length])); - protocol.grant(null, request); + protocol.grant(controller, request); } /** @@ -2171,9 +2208,9 @@ public final class ProtobufUtil { * @param actions the permissions to be granted * @throws ServiceException */ - public static void grant(AccessControlService.BlockingInterface protocol, - String userShortName, TableName tableName, byte[] f, byte[] q, - Permission.Action... actions) throws ServiceException { + public static void grant(RpcController controller, + AccessControlService.BlockingInterface protocol, String userShortName, TableName tableName, + byte[] f, byte[] q, Permission.Action... actions) throws ServiceException { List permActions = Lists.newArrayListWithCapacity(actions.length); for (Permission.Action a : actions) { @@ -2182,7 +2219,7 @@ public final class ProtobufUtil { AccessControlProtos.GrantRequest request = RequestConverter. buildGrantRequest(userShortName, tableName, f, q, permActions.toArray( new AccessControlProtos.Permission.Action[actions.length])); - protocol.grant(null, request); + protocol.grant(controller, request); } /** @@ -2195,8 +2232,8 @@ public final class ProtobufUtil { * @param actions the permissions to be granted * @throws ServiceException */ - public static void grant(AccessControlService.BlockingInterface protocol, - String userShortName, String namespace, + public static void grant(RpcController controller, + AccessControlService.BlockingInterface protocol, String userShortName, String namespace, Permission.Action... actions) throws ServiceException { List permActions = Lists.newArrayListWithCapacity(actions.length); @@ -2206,7 +2243,7 @@ public final class ProtobufUtil { AccessControlProtos.GrantRequest request = RequestConverter. buildGrantRequest(userShortName, namespace, permActions.toArray( new AccessControlProtos.Permission.Action[actions.length])); - protocol.grant(null, request); + protocol.grant(controller, request); } /** @@ -2219,8 +2256,9 @@ public final class ProtobufUtil { * @param actions the permissions to be revoked * @throws ServiceException */ - public static void revoke(AccessControlService.BlockingInterface protocol, - String userShortName, Permission.Action... actions) throws ServiceException { + public static void revoke(RpcController controller, + AccessControlService.BlockingInterface protocol, String userShortName, + Permission.Action... actions) throws ServiceException { List permActions = Lists.newArrayListWithCapacity(actions.length); for (Permission.Action a : actions) { @@ -2229,7 +2267,7 @@ public final class ProtobufUtil { AccessControlProtos.RevokeRequest request = RequestConverter. buildRevokeRequest(userShortName, permActions.toArray( new AccessControlProtos.Permission.Action[actions.length])); - protocol.revoke(null, request); + protocol.revoke(controller, request); } /** @@ -2246,9 +2284,9 @@ public final class ProtobufUtil { * @param actions the permissions to be revoked * @throws ServiceException */ - public static void revoke(AccessControlService.BlockingInterface protocol, - String userShortName, TableName tableName, byte[] f, byte[] q, - Permission.Action... actions) throws ServiceException { + public static void revoke(RpcController controller, + AccessControlService.BlockingInterface protocol, String userShortName, TableName tableName, + byte[] f, byte[] q, Permission.Action... actions) throws ServiceException { List permActions = Lists.newArrayListWithCapacity(actions.length); for (Permission.Action a : actions) { @@ -2257,7 +2295,7 @@ public final class ProtobufUtil { AccessControlProtos.RevokeRequest request = RequestConverter. buildRevokeRequest(userShortName, tableName, f, q, permActions.toArray( new AccessControlProtos.Permission.Action[actions.length])); - protocol.revoke(null, request); + protocol.revoke(controller, request); } /** @@ -2271,8 +2309,8 @@ public final class ProtobufUtil { * @param actions the permissions to be revoked * @throws ServiceException */ - public static void revoke(AccessControlService.BlockingInterface protocol, - String userShortName, String namespace, + public static void revoke(RpcController controller, + AccessControlService.BlockingInterface protocol, String userShortName, String namespace, Permission.Action... actions) throws ServiceException { List permActions = Lists.newArrayListWithCapacity(actions.length); @@ -2282,7 +2320,7 @@ public final class ProtobufUtil { AccessControlProtos.RevokeRequest request = RequestConverter. buildRevokeRequest(userShortName, namespace, permActions.toArray( new AccessControlProtos.Permission.Action[actions.length])); - protocol.revoke(null, request); + protocol.revoke(controller, request); } /** @@ -2293,14 +2331,14 @@ public final class ProtobufUtil { * @param protocol the AccessControlService protocol proxy * @throws ServiceException */ - public static List getUserPermissions( + public static List getUserPermissions(RpcController controller, AccessControlService.BlockingInterface protocol) throws ServiceException { AccessControlProtos.GetUserPermissionsRequest.Builder builder = AccessControlProtos.GetUserPermissionsRequest.newBuilder(); builder.setType(AccessControlProtos.Permission.Type.Global); AccessControlProtos.GetUserPermissionsRequest request = builder.build(); AccessControlProtos.GetUserPermissionsResponse response = - protocol.getUserPermissions(null, request); + protocol.getUserPermissions(controller, request); List perms = new ArrayList(response.getUserPermissionCount()); for (AccessControlProtos.UserPermission perm: response.getUserPermissionList()) { perms.add(ProtobufUtil.toUserPermission(perm)); @@ -2317,7 +2355,7 @@ public final class ProtobufUtil { * @param t optional table name * @throws ServiceException */ - public static List getUserPermissions( + public static List getUserPermissions(RpcController controller, AccessControlService.BlockingInterface protocol, TableName t) throws ServiceException { AccessControlProtos.GetUserPermissionsRequest.Builder builder = @@ -2328,7 +2366,7 @@ public final class ProtobufUtil { builder.setType(AccessControlProtos.Permission.Type.Table); AccessControlProtos.GetUserPermissionsRequest request = builder.build(); AccessControlProtos.GetUserPermissionsResponse response = - protocol.getUserPermissions(null, request); + protocol.getUserPermissions(controller, request); List perms = new ArrayList(response.getUserPermissionCount()); for (AccessControlProtos.UserPermission perm: response.getUserPermissionList()) { perms.add(ProtobufUtil.toUserPermission(perm)); @@ -2345,7 +2383,7 @@ public final class ProtobufUtil { * @param namespace name of the namespace * @throws ServiceException */ - public static List getUserPermissions( + public static List getUserPermissions(RpcController controller, AccessControlService.BlockingInterface protocol, byte[] namespace) throws ServiceException { AccessControlProtos.GetUserPermissionsRequest.Builder builder = @@ -2356,7 +2394,7 @@ public final class ProtobufUtil { builder.setType(AccessControlProtos.Permission.Type.Namespace); AccessControlProtos.GetUserPermissionsRequest request = builder.build(); AccessControlProtos.GetUserPermissionsResponse response = - protocol.getUserPermissions(null, request); + protocol.getUserPermissions(controller, request); List perms = new ArrayList(response.getUserPermissionCount()); for (AccessControlProtos.UserPermission perm: response.getUserPermissionList()) { perms.add(ProtobufUtil.toUserPermission(perm)); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/access/AccessControlClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/access/AccessControlClient.java index c50abc1e540..25ac01f1a1b 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/access/AccessControlClient.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/access/AccessControlClient.java @@ -31,10 +31,12 @@ import org.apache.hadoop.hbase.ZooKeeperConnectionException; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.ClusterConnection; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.client.security.SecurityCapability; import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel; +import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos; import org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos.AccessControlService.BlockingInterface; @@ -92,9 +94,12 @@ public class AccessControlClient { public static void grant(Connection connection, final TableName tableName, final String userName, final byte[] family, final byte[] qual, final Permission.Action... actions) throws Throwable { + PayloadCarryingRpcController controller + = ((ClusterConnection) connection).getRpcControllerFactory().newController(); + controller.setPriority(tableName); try (Table table = connection.getTable(ACL_TABLE_NAME)) { - ProtobufUtil.grant(getAccessControlServiceStub(table), userName, tableName, family, qual, - actions); + ProtobufUtil.grant(controller, getAccessControlServiceStub(table), userName, tableName, + family, qual, actions); } } @@ -108,8 +113,12 @@ public class AccessControlClient { */ public static void grant(Connection connection, final String namespace, final String userName, final Permission.Action... actions) throws Throwable { + PayloadCarryingRpcController controller + = ((ClusterConnection) connection).getRpcControllerFactory().newController(); + try (Table table = connection.getTable(ACL_TABLE_NAME)) { - ProtobufUtil.grant(getAccessControlServiceStub(table), userName, namespace, actions); + ProtobufUtil.grant(controller, getAccessControlServiceStub(table), userName, namespace, + actions); } } @@ -119,8 +128,10 @@ public class AccessControlClient { */ public static void grant(Connection connection, final String userName, final Permission.Action... actions) throws Throwable { + PayloadCarryingRpcController controller + = ((ClusterConnection) connection).getRpcControllerFactory().newController(); try (Table table = connection.getTable(ACL_TABLE_NAME)) { - ProtobufUtil.grant(getAccessControlServiceStub(table), userName, actions); + ProtobufUtil.grant(controller, getAccessControlServiceStub(table), userName, actions); } } @@ -144,9 +155,12 @@ public class AccessControlClient { public static void revoke(Connection connection, final TableName tableName, final String username, final byte[] family, final byte[] qualifier, final Permission.Action... actions) throws Throwable { + PayloadCarryingRpcController controller + = ((ClusterConnection) connection).getRpcControllerFactory().newController(); + controller.setPriority(tableName); try (Table table = connection.getTable(ACL_TABLE_NAME)) { - ProtobufUtil.revoke(getAccessControlServiceStub(table), username, tableName, family, - qualifier, actions); + ProtobufUtil.revoke(controller, getAccessControlServiceStub(table), username, tableName, + family, qualifier, actions); } } @@ -160,8 +174,11 @@ public class AccessControlClient { */ public static void revoke(Connection connection, final String namespace, final String userName, final Permission.Action... actions) throws Throwable { + PayloadCarryingRpcController controller + = ((ClusterConnection) connection).getRpcControllerFactory().newController(); try (Table table = connection.getTable(ACL_TABLE_NAME)) { - ProtobufUtil.revoke(getAccessControlServiceStub(table), userName, namespace, actions); + ProtobufUtil.revoke(controller, getAccessControlServiceStub(table), userName, namespace, + actions); } } @@ -171,10 +188,11 @@ public class AccessControlClient { */ public static void revoke(Connection connection, final String userName, final Permission.Action... actions) throws Throwable { + PayloadCarryingRpcController controller + = ((ClusterConnection) connection).getRpcControllerFactory().newController(); try (Table table = connection.getTable(ACL_TABLE_NAME)) { - ProtobufUtil.revoke(getAccessControlServiceStub(table), userName, actions); + ProtobufUtil.revoke(controller, getAccessControlServiceStub(table), userName, actions); } - } /** @@ -188,6 +206,8 @@ public class AccessControlClient { */ public static List getUserPermissions(Connection connection, String tableRegex) throws Throwable { + PayloadCarryingRpcController controller + = ((ClusterConnection) connection).getRpcControllerFactory().newController(); List permList = new ArrayList(); try (Table table = connection.getTable(ACL_TABLE_NAME)) { try (Admin admin = connection.getAdmin()) { @@ -196,14 +216,16 @@ public class AccessControlClient { AccessControlProtos.AccessControlService.newBlockingStub(service); HTableDescriptor[] htds = null; if (tableRegex == null || tableRegex.isEmpty()) { - permList = ProtobufUtil.getUserPermissions(protocol); + permList = ProtobufUtil.getUserPermissions(controller, protocol); } else if (tableRegex.charAt(0) == '@') { String namespace = tableRegex.substring(1); - permList = ProtobufUtil.getUserPermissions(protocol, Bytes.toBytes(namespace)); + permList = ProtobufUtil.getUserPermissions(controller, protocol, + Bytes.toBytes(namespace)); } else { htds = admin.listTables(Pattern.compile(tableRegex), true); for (HTableDescriptor hd : htds) { - permList.addAll(ProtobufUtil.getUserPermissions(protocol, hd.getTableName())); + permList.addAll(ProtobufUtil.getUserPermissions(controller, protocol, + hd.getTableName())); } } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/MetaTableLocator.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/MetaTableLocator.java index 0b844a25135..0b53f9503ba 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/MetaTableLocator.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/MetaTableLocator.java @@ -36,11 +36,14 @@ import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.NotAllMetaRegionsOnlineException; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.ClusterConnection; +import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.HConnection; import org.apache.hadoop.hbase.client.RegionReplicaUtil; import org.apache.hadoop.hbase.client.RetriesExhaustedException; import org.apache.hadoop.hbase.exceptions.DeserializationException; import org.apache.hadoop.hbase.ipc.FailedServerException; +import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController; import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException; import org.apache.hadoop.hbase.master.RegionState; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; @@ -287,7 +290,7 @@ public class MetaTableLocator { } catch (RegionServerStoppedException e) { // Pass -- server name sends us to a server that is dying or already dead. } - return (service != null) && verifyRegionLocation(service, + return (service != null) && verifyRegionLocation(hConnection, service, getMetaRegionLocation(zkw, replicaId), RegionReplicaUtil.getRegionInfoForReplica( HRegionInfo.FIRST_META_REGIONINFO, replicaId).getRegionName()); } @@ -307,17 +310,22 @@ public class MetaTableLocator { // rather than have to pass it in. Its made awkward by the fact that the // HRI is likely a proxy against remote server so the getServerName needs // to be fixed to go to a local method or to a cache before we can do this. - private boolean verifyRegionLocation(AdminService.BlockingInterface hostingServer, - final ServerName address, final byte [] regionName) + private boolean verifyRegionLocation(final Connection connection, + AdminService.BlockingInterface hostingServer, final ServerName address, + final byte [] regionName) throws IOException { if (hostingServer == null) { LOG.info("Passed hostingServer is null"); return false; } Throwable t; + PayloadCarryingRpcController controller = null; + if (connection instanceof ClusterConnection) { + controller = ((ClusterConnection) connection).getRpcControllerFactory().newController(); + } try { // Try and get regioninfo from the hosting server. - return ProtobufUtil.getRegionInfo(hostingServer, regionName) != null; + return ProtobufUtil.getRegionInfo(controller, hostingServer, regionName) != null; } catch (ConnectException e) { t = e; } catch (RetriesExhaustedException e) { @@ -594,7 +602,7 @@ public class MetaTableLocator { ServerName sn = null; while (true) { sn = getMetaRegionLocation(zkw, replicaId); - if (sn != null || (System.currentTimeMillis() - startTime) + if (sn != null || (System.currentTimeMillis() - startTime) > timeout - HConstants.SOCKET_RETRY_WAIT_MS) { break; } diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestSnapshotFromAdmin.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestSnapshotFromAdmin.java index 1b039bd241d..4d55c33e629 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestSnapshotFromAdmin.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestSnapshotFromAdmin.java @@ -28,6 +28,8 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController; +import org.apache.hadoop.hbase.ipc.RpcControllerFactory; import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription; import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsSnapshotDoneRequest; import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsSnapshotDoneResponse; @@ -78,26 +80,34 @@ public class TestSnapshotFromAdmin { // setup the conf to match the expected properties conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, numRetries); conf.setLong("hbase.client.pause", pauseTime); + // mock the master admin to our mock MasterKeepAliveConnection mockMaster = Mockito.mock(MasterKeepAliveConnection.class); Mockito.when(mockConnection.getConfiguration()).thenReturn(conf); Mockito.when(mockConnection.getKeepAliveMasterService()).thenReturn(mockMaster); + // we need a real retrying caller + RpcRetryingCallerFactory callerFactory = new RpcRetryingCallerFactory(conf); + RpcControllerFactory controllerFactory = Mockito.mock(RpcControllerFactory.class); + Mockito.when(controllerFactory.newController()).thenReturn( + Mockito.mock(PayloadCarryingRpcController.class)); + Mockito.when(mockConnection.getRpcRetryingCallerFactory()).thenReturn(callerFactory); + Mockito.when(mockConnection.getRpcControllerFactory()).thenReturn(controllerFactory); // set the max wait time for the snapshot to complete SnapshotResponse response = SnapshotResponse.newBuilder() .setExpectedTimeout(maxWaitTime) .build(); Mockito - .when( - mockMaster.snapshot((RpcController) Mockito.isNull(), - Mockito.any(SnapshotRequest.class))).thenReturn(response); + .when( + mockMaster.snapshot((RpcController) Mockito.any(), + Mockito.any(SnapshotRequest.class))).thenReturn(response); // setup the response IsSnapshotDoneResponse.Builder builder = IsSnapshotDoneResponse.newBuilder(); builder.setDone(false); // first five times, we return false, last we get success Mockito.when( - mockMaster.isSnapshotDone((RpcController) Mockito.isNull(), + mockMaster.isSnapshotDone((RpcController) Mockito.any(), Mockito.any(IsSnapshotDoneRequest.class))).thenReturn(builder.build(), builder.build(), - builder.build(), builder.build(), builder.build(), builder.setDone(true).build()); + builder.build(), builder.build(), builder.build(), builder.setDone(true).build()); // setup the admin and run the test Admin admin = new HBaseAdmin(mockConnection); @@ -123,6 +133,13 @@ public class TestSnapshotFromAdmin { .mock(ConnectionImplementation.class); Configuration conf = HBaseConfiguration.create(); Mockito.when(mockConnection.getConfiguration()).thenReturn(conf); + // we need a real retrying caller + RpcRetryingCallerFactory callerFactory = new RpcRetryingCallerFactory(conf); + RpcControllerFactory controllerFactory = Mockito.mock(RpcControllerFactory.class); + Mockito.when(controllerFactory.newController()).thenReturn( + Mockito.mock(PayloadCarryingRpcController.class)); + Mockito.when(mockConnection.getRpcRetryingCallerFactory()).thenReturn(callerFactory); + Mockito.when(mockConnection.getRpcControllerFactory()).thenReturn(controllerFactory); Admin admin = new HBaseAdmin(mockConnection); SnapshotDescription.Builder builder = SnapshotDescription.newBuilder(); // check that invalid snapshot names fail @@ -142,11 +159,11 @@ public class TestSnapshotFromAdmin { Mockito.when(mockConnection.getKeepAliveMasterService()).thenReturn(master); SnapshotResponse response = SnapshotResponse.newBuilder().setExpectedTimeout(0).build(); Mockito.when( - master.snapshot((RpcController) Mockito.isNull(), Mockito.any(SnapshotRequest.class))) + master.snapshot((RpcController) Mockito.any(), Mockito.any(SnapshotRequest.class))) .thenReturn(response); IsSnapshotDoneResponse doneResponse = IsSnapshotDoneResponse.newBuilder().setDone(true).build(); Mockito.when( - master.isSnapshotDone((RpcController) Mockito.isNull(), + master.isSnapshotDone((RpcController) Mockito.any(), Mockito.any(IsSnapshotDoneRequest.class))).thenReturn(doneResponse); // make sure that we can use valid names diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/DistributedHBaseCluster.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/DistributedHBaseCluster.java index 6f3baa0aa3b..31ca996edb8 100644 --- a/hbase-it/src/test/java/org/apache/hadoop/hbase/DistributedHBaseCluster.java +++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/DistributedHBaseCluster.java @@ -279,7 +279,7 @@ public class DistributedHBaseCluster extends HBaseCluster { AdminProtos.AdminService.BlockingInterface client = ((ClusterConnection)this.connection).getAdmin(regionLoc.getServerName()); - ServerInfo info = ProtobufUtil.getServerInfo(client); + ServerInfo info = ProtobufUtil.getServerInfo(null, client); return ProtobufUtil.toServerName(info.getServerName()); } @@ -433,7 +433,7 @@ public class DistributedHBaseCluster extends HBaseCluster { Set toKill = new TreeSet(new ServerNameIgnoreStartCodeComparator()); toStart.addAll(initial.getServers()); toKill.addAll(current.getServers()); - + ServerName master = initial.getMaster(); for (ServerName server : current.getServers()) { diff --git a/hbase-server/src/main/jamon/org/apache/hadoop/hbase/tmpl/regionserver/RSStatusTmpl.jamon b/hbase-server/src/main/jamon/org/apache/hadoop/hbase/tmpl/regionserver/RSStatusTmpl.jamon index 158a2393b4e..b98f50dbf96 100644 --- a/hbase-server/src/main/jamon/org/apache/hadoop/hbase/tmpl/regionserver/RSStatusTmpl.jamon +++ b/hbase-server/src/main/jamon/org/apache/hadoop/hbase/tmpl/regionserver/RSStatusTmpl.jamon @@ -42,7 +42,7 @@ org.apache.hadoop.hbase.zookeeper.MasterAddressTracker; <%java return; %> <%java> - ServerInfo serverInfo = ProtobufUtil.getServerInfo(regionServer.getRSRpcServices()); + ServerInfo serverInfo = ProtobufUtil.getServerInfo(null, regionServer.getRSRpcServices()); ServerName serverName = ProtobufUtil.toServerName(serverInfo.getServerName()); List onlineRegions = ProtobufUtil.getOnlineRegions(regionServer.getRSRpcServices()); MasterAddressTracker masterAddressTracker = regionServer.getMasterAddressTracker(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java index 64a75b99809..a9cf0f1f253 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java @@ -22,7 +22,9 @@ import java.nio.channels.ClosedChannelException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.CellScanner; +import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.hbase.ipc.RpcServer.Call; import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler; import org.apache.hadoop.hbase.util.Pair; @@ -38,7 +40,8 @@ import com.google.protobuf.Message; * {@link RpcScheduler}. Call {@link #run()} to actually execute the contained * RpcServer.Call */ -@InterfaceAudience.Private +@InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX}) +@InterfaceStability.Evolving public class CallRunner { private static final Log LOG = LogFactory.getLog(CallRunner.class); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcScheduler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcScheduler.java index 2414e3deda1..91c152bbcd1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcScheduler.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcScheduler.java @@ -37,7 +37,7 @@ public abstract class RpcScheduler { "hbase.ipc.server.priority.max.callqueue.length"; /** Exposes runtime information of a {@code RpcServer} that a {@code RpcScheduler} may need. */ - static abstract class Context { + public static abstract class Context { public abstract InetSocketAddress getListenerAddress(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java index 2c6084a68b2..f0aed2eb97a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java @@ -289,7 +289,9 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { * Datastructure that holds all necessary to a method invocation and then afterward, carries * the result. */ - class Call implements RpcCallContext { + @InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX}) + @InterfaceStability.Evolving + public class Call implements RpcCallContext { protected int id; // the client's call id protected BlockingService service; protected MethodDescriptor md; @@ -369,6 +371,14 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { return this.header; } + public boolean hasPriority() { + return this.header.hasPriority(); + } + + public int getPriority() { + return this.header.getPriority(); + } + /* * Short string representation without param info because param itself could be huge depends on * the payload of a command diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionPlacementMaintainer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionPlacementMaintainer.java index 196320d3cde..d31711ea123 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionPlacementMaintainer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionPlacementMaintainer.java @@ -695,7 +695,7 @@ public class RegionPlacementMaintainer { UpdateFavoredNodesResponse updateFavoredNodesResponse = currentRegionServer.updateFavoredNodes(null, request); LOG.info("Region server " + - ProtobufUtil.getServerInfo(currentRegionServer).getServerName() + + ProtobufUtil.getServerInfo(null, currentRegionServer).getServerName() + " has updated " + updateFavoredNodesResponse.getResponse() + " / " + singleServerPlan.getAssignmentMap().size() + " regions with the assignment plan"); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java index 341d51c7848..dabef713d00 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java @@ -50,6 +50,8 @@ import org.apache.hadoop.hbase.ZooKeeperConnectionException; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.ClusterConnection; import org.apache.hadoop.hbase.client.RetriesExhaustedException; +import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController; +import org.apache.hadoop.hbase.ipc.RpcControllerFactory; import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer; import org.apache.hadoop.hbase.master.procedure.ServerCrashProcedure; import org.apache.hadoop.hbase.monitoring.MonitoredTask; @@ -158,6 +160,7 @@ public class ServerManager { private final long warningSkew; private final RetryCounterFactory pingRetryCounterFactory; + private final RpcControllerFactory rpcControllerFactory; /** * Set of region servers which are dead but not processed immediately. If one @@ -222,6 +225,9 @@ public class ServerManager { int pingSleepInterval = Math.max(1, master.getConfiguration().getInt( "hbase.master.ping.server.retry.sleep.interval", 100)); this.pingRetryCounterFactory = new RetryCounterFactory(pingMaxAttempts, pingSleepInterval); + this.rpcControllerFactory = this.connection == null + ? null + : connection.getRpcControllerFactory(); } /** @@ -784,6 +790,10 @@ public class ServerManager { } } + private PayloadCarryingRpcController newRpcController() { + return rpcControllerFactory == null ? null : rpcControllerFactory.newController(); + } + /** * Sends an CLOSE RPC to the specified server to close the specified region. *

@@ -804,8 +814,8 @@ public class ServerManager { region.getRegionNameAsString() + " failed because no RPC connection found to this server"); } - return ProtobufUtil.closeRegion(admin, server, region.getRegionName(), - dest); + PayloadCarryingRpcController controller = newRpcController(); + return ProtobufUtil.closeRegion(controller, admin, server, region.getRegionName(), dest); } public boolean sendRegionClose(ServerName server, @@ -826,7 +836,8 @@ public class ServerManager { if (server == null) return; try { AdminService.BlockingInterface admin = getRsAdmin(server); - ProtobufUtil.warmupRegion(admin, region); + PayloadCarryingRpcController controller = newRpcController(); + ProtobufUtil.warmupRegion(controller, admin, region); } catch (IOException e) { LOG.error("Received exception in RPC for warmup server:" + server + "region: " + region + @@ -838,11 +849,12 @@ public class ServerManager { * Contacts a region server and waits up to timeout ms * to close the region. This bypasses the active hmaster. */ - public static void closeRegionSilentlyAndWait(ClusterConnection connection, + public static void closeRegionSilentlyAndWait(ClusterConnection connection, ServerName server, HRegionInfo region, long timeout) throws IOException, InterruptedException { AdminService.BlockingInterface rs = connection.getAdmin(server); + PayloadCarryingRpcController controller = connection.getRpcControllerFactory().newController(); try { - ProtobufUtil.closeRegion(rs, server, region.getRegionName()); + ProtobufUtil.closeRegion(controller, rs, server, region.getRegionName()); } catch (IOException e) { LOG.warn("Exception when closing region: " + region.getRegionNameAsString(), e); } @@ -850,12 +862,13 @@ public class ServerManager { while (System.currentTimeMillis() < expiration) { try { HRegionInfo rsRegion = - ProtobufUtil.getRegionInfo(rs, region.getRegionName()); + ProtobufUtil.getRegionInfo(controller, rs, region.getRegionName()); if (rsRegion == null) return; } catch (IOException ioe) { if (ioe instanceof NotServingRegionException) // no need to retry again return; - LOG.warn("Exception when retrieving regioninfo from: " + region.getRegionNameAsString(), ioe); + LOG.warn("Exception when retrieving regioninfo from: " + + region.getRegionNameAsString(), ioe); } Thread.sleep(1000); } @@ -890,7 +903,8 @@ public class ServerManager { + region_b.getRegionNameAsString() + " failed because no RPC connection found to this server"); } - ProtobufUtil.mergeRegions(admin, region_a, region_b, forcible, user); + PayloadCarryingRpcController controller = newRpcController(); + ProtobufUtil.mergeRegions(controller, admin, region_a, region_b, forcible, user); } /** @@ -899,12 +913,14 @@ public class ServerManager { public boolean isServerReachable(ServerName server) { if (server == null) throw new NullPointerException("Passed server is null"); + RetryCounter retryCounter = pingRetryCounterFactory.create(); while (retryCounter.shouldRetry()) { try { + PayloadCarryingRpcController controller = newRpcController(); AdminService.BlockingInterface admin = getRsAdmin(server); if (admin != null) { - ServerInfo info = ProtobufUtil.getServerInfo(admin); + ServerInfo info = ProtobufUtil.getServerInfo(controller, admin); return info != null && info.hasServerName() && server.getStartcode() == info.getServerName().getStartCode(); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/MiniHBaseCluster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/MiniHBaseCluster.java index 3bd3c3f9426..002bdb2e73c 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/MiniHBaseCluster.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/MiniHBaseCluster.java @@ -693,8 +693,8 @@ public class MiniHBaseCluster extends HBaseCluster { int count = 0; for (JVMClusterUtil.RegionServerThread rst: getRegionServerThreads()) { HRegionServer hrs = rst.getRegionServer(); - Region metaRegion = hrs.getOnlineRegion(regionName); - if (metaRegion != null) { + Region region = hrs.getOnlineRegion(regionName); + if (region != null) { index = count; break; } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestGlobalMemStoreSize.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestGlobalMemStoreSize.java index e4605354a43..8ac89da83e9 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestGlobalMemStoreSize.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestGlobalMemStoreSize.java @@ -55,11 +55,11 @@ public class TestGlobalMemStoreSize { private HBaseTestingUtility TEST_UTIL; private MiniHBaseCluster cluster; - + /** * Test the global mem store size in the region server is equal to sum of each * region's mem store size - * @throws Exception + * @throws Exception */ @Test public void testGlobalMemStore() throws Exception { @@ -87,8 +87,8 @@ public class TestGlobalMemStoreSize { for (HRegionServer server : getOnlineRegionServers()) { long globalMemStoreSize = 0; for (HRegionInfo regionInfo : - ProtobufUtil.getOnlineRegions(server.getRSRpcServices())) { - globalMemStoreSize += + ProtobufUtil.getOnlineRegions(null, server.getRSRpcServices())) { + globalMemStoreSize += server.getFromOnlineRegions(regionInfo.getEncodedName()). getMemstoreSize(); } @@ -103,7 +103,7 @@ public class TestGlobalMemStoreSize { ", size=" + server.getRegionServerAccounting().getGlobalMemstoreSize()); for (HRegionInfo regionInfo : - ProtobufUtil.getOnlineRegions(server.getRSRpcServices())) { + ProtobufUtil.getOnlineRegions(null, server.getRSRpcServices())) { Region r = server.getFromOnlineRegions(regionInfo.getEncodedName()); flush(r, server); } @@ -119,7 +119,7 @@ public class TestGlobalMemStoreSize { // If size > 0, see if its because the meta region got edits while // our test was running.... for (HRegionInfo regionInfo : - ProtobufUtil.getOnlineRegions(server.getRSRpcServices())) { + ProtobufUtil.getOnlineRegions(null, server.getRSRpcServices())) { Region r = server.getFromOnlineRegions(regionInfo.getEncodedName()); long l = r.getMemstoreSize(); if (l > 0) { @@ -154,7 +154,7 @@ public class TestGlobalMemStoreSize { private List getOnlineRegionServers() { List list = new ArrayList(); - for (JVMClusterUtil.RegionServerThread rst : + for (JVMClusterUtil.RegionServerThread rst : cluster.getRegionServerThreads()) { if (rst.getRegionServer().isOnline()) { list.add(rst.getRegionServer()); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestMetaTableAccessor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestMetaTableAccessor.java index 7e934c0de6f..8b84452431c 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestMetaTableAccessor.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestMetaTableAccessor.java @@ -44,6 +44,13 @@ import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.RegionLocator; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.ipc.CallRunner; +import org.apache.hadoop.hbase.ipc.DelegatingRpcScheduler; +import org.apache.hadoop.hbase.ipc.PriorityFunction; +import org.apache.hadoop.hbase.ipc.RpcScheduler; +import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.regionserver.RSRpcServices; +import org.apache.hadoop.hbase.regionserver.SimpleRpcSchedulerFactory; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.MiscTests; import org.apache.hadoop.hbase.util.Bytes; @@ -618,5 +625,77 @@ public class TestMetaTableAccessor { meta.close(); } } + + public static class SpyingRpcSchedulerFactory extends SimpleRpcSchedulerFactory { + @Override + public RpcScheduler create(Configuration conf, PriorityFunction priority, Abortable server) { + final RpcScheduler delegate = super.create(conf, priority, server); + return new SpyingRpcScheduler(delegate); + } + } + + public static class SpyingRpcScheduler extends DelegatingRpcScheduler { + long numPriorityCalls = 0; + + public SpyingRpcScheduler(RpcScheduler delegate) { + super(delegate); + } + + @Override + public boolean dispatch(CallRunner task) throws IOException, InterruptedException { + int priority = task.getCall().getPriority(); + + if (priority > HConstants.QOS_THRESHOLD) { + numPriorityCalls++; + } + return super.dispatch(task); + } + } + + @Test + public void testMetaUpdatesGoToPriorityQueue() throws Exception { + // This test has to be end-to-end, and do the verification from the server side + Configuration c = UTIL.getConfiguration(); + + c.set(RSRpcServices.REGION_SERVER_RPC_SCHEDULER_FACTORY_CLASS, + SpyingRpcSchedulerFactory.class.getName()); + + // restart so that new config takes place + afterClass(); + beforeClass(); + + TableName tableName = TableName.valueOf("foo"); + try (Admin admin = connection.getAdmin(); + RegionLocator rl = connection.getRegionLocator(tableName)) { + + // create a table and prepare for a manual split + UTIL.createTable(tableName, "cf1"); + + HRegionLocation loc = rl.getAllRegionLocations().get(0); + HRegionInfo parent = loc.getRegionInfo(); + long rid = 1000; + byte[] splitKey = Bytes.toBytes("a"); + HRegionInfo splitA = new HRegionInfo(parent.getTable(), parent.getStartKey(), + splitKey, false, rid); + HRegionInfo splitB = new HRegionInfo(parent.getTable(), splitKey, + parent.getEndKey(), false, rid); + + // find the meta server + MiniHBaseCluster cluster = UTIL.getMiniHBaseCluster(); + int rsIndex = cluster.getServerWithMeta(); + HRegionServer rs; + if (rsIndex >= 0) { + rs = cluster.getRegionServer(rsIndex); + } else { + // it is in master + rs = cluster.getMaster(); + } + SpyingRpcScheduler scheduler = (SpyingRpcScheduler) rs.getRpcServer().getScheduler(); + long prevCalls = scheduler.numPriorityCalls; + MetaTableAccessor.splitRegion(connection, parent, splitA, splitB, loc.getServerName(), 1); + + assertTrue(prevCalls < scheduler.numPriorityCalls); + } + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestMetaTableLocator.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestMetaTableLocator.java index 99437499d4a..ba6e1d40016 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestMetaTableLocator.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestMetaTableLocator.java @@ -31,6 +31,8 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.client.ClusterConnection; import org.apache.hadoop.hbase.client.HConnectionTestingUtility; +import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController; +import org.apache.hadoop.hbase.ipc.RpcControllerFactory; import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException; import org.apache.hadoop.hbase.master.RegionState; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos; @@ -180,7 +182,7 @@ public class TestMetaTableLocator { // Mock an ClientProtocol. final ClientProtos.ClientService.BlockingInterface implementation = Mockito.mock(ClientProtos.ClientService.BlockingInterface.class); - + ClusterConnection connection = mockConnection(null, implementation); // If a 'get' is called on mocked interface, throw connection refused. @@ -250,6 +252,10 @@ public class TestMetaTableLocator { (GetRegionInfoRequest)Mockito.any())).thenThrow(connectException); Mockito.when(connection.getAdmin(Mockito.any(ServerName.class))). thenReturn(implementation); + RpcControllerFactory controllerFactory = Mockito.mock(RpcControllerFactory.class); + Mockito.when(controllerFactory.newController()).thenReturn( + Mockito.mock(PayloadCarryingRpcController.class)); + Mockito.when(connection.getRpcControllerFactory()).thenReturn(controllerFactory); ServerName sn = ServerName.valueOf("example.com", 1234, System.currentTimeMillis()); MetaTableLocator.setMetaLocation(this.watcher, diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java index 24c0c32cb63..dc1ecf1d32c 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java @@ -54,6 +54,11 @@ public class HConnectionTestingUtility { throws ZooKeeperConnectionException { ConnectionImplementation connection = Mockito.mock(ConnectionImplementation.class); Mockito.when(connection.getConfiguration()).thenReturn(conf); + Mockito.when(connection.getRpcControllerFactory()).thenReturn( + Mockito.mock(RpcControllerFactory.class)); + // we need a real retrying caller + RpcRetryingCallerFactory callerFactory = new RpcRetryingCallerFactory(conf); + Mockito.when(connection.getRpcRetryingCallerFactory()).thenReturn(callerFactory); return connection; } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAdmin1.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAdmin1.java index df8f4f68aa6..10dbed083f8 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAdmin1.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAdmin1.java @@ -1254,8 +1254,8 @@ public class TestAdmin1 { try { AdminService.BlockingInterface admin = TEST_UTIL.getHBaseAdmin().getConnection() .getAdmin(regions.get(1).getSecond()); - ProtobufUtil.mergeRegions(admin, regions.get(1).getFirst(), regions.get(2).getFirst(), true, - null); + ProtobufUtil.mergeRegions(null, admin, regions.get(1).getFirst(), regions.get(2).getFirst(), + true, null); } catch (MergeRegionException mm) { gotException = true; } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java index 3b88184f14d..520f2107acf 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java @@ -604,7 +604,7 @@ public class TestFromClientSide { public void testMaxKeyValueSize() throws Exception { TableName TABLE = TableName.valueOf("testMaxKeyValueSize"); Configuration conf = TEST_UTIL.getConfiguration(); - String oldMaxSize = conf.get(TableConfiguration.MAX_KEYVALUE_SIZE_KEY); + String oldMaxSize = conf.get(ConnectionConfiguration.MAX_KEYVALUE_SIZE_KEY); Table ht = TEST_UTIL.createTable(TABLE, FAMILY); byte[] value = new byte[4 * 1024 * 1024]; Put put = new Put(ROW); @@ -612,7 +612,7 @@ public class TestFromClientSide { ht.put(put); try { TEST_UTIL.getConfiguration().setInt( - TableConfiguration.MAX_KEYVALUE_SIZE_KEY, 2 * 1024 * 1024); + ConnectionConfiguration.MAX_KEYVALUE_SIZE_KEY, 2 * 1024 * 1024); // Create new table so we pick up the change in Configuration. try (Connection connection = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration())) { @@ -624,7 +624,7 @@ public class TestFromClientSide { } fail("Inserting a too large KeyValue worked, should throw exception"); } catch(Exception e) {} - conf.set(TableConfiguration.MAX_KEYVALUE_SIZE_KEY, oldMaxSize); + conf.set(ConnectionConfiguration.MAX_KEYVALUE_SIZE_KEY, oldMaxSize); } @Test diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java index 5995191b2c7..ddd5fa3b928 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java @@ -270,20 +270,20 @@ public class TestFromClientSide3 { // create an empty Put Put put1 = new Put(ROW); actions.add(put1); - + Put put2 = new Put(ANOTHERROW); put2.addColumn(FAMILY, QUALIFIER, VALUE); actions.add(put2); - + table.batch(actions, results); fail("Empty Put should have failed the batch call"); } catch (IllegalArgumentException iae) { - + } finally { table.close(); } } - + @Test public void testHTableExistsMethodSingleRegionSingleGet() throws Exception { // Test with a single region table. @@ -401,7 +401,7 @@ public class TestFromClientSide3 { @Test public void testHTableExistsMethodMultipleRegionsMultipleGets() throws Exception { Table table = TEST_UTIL.createTable( - TableName.valueOf("testHTableExistsMethodMultipleRegionsMultipleGets"), + TableName.valueOf("testHTableExistsMethodMultipleRegionsMultipleGets"), new byte[][] { FAMILY }, 1, new byte[] { 0x00 }, new byte[] { (byte) 0xff }, 255); Put put = new Put(ROW); put.addColumn(FAMILY, QUALIFIER, VALUE); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHBaseAdminNoCluster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHBaseAdminNoCluster.java index 8b2b7335bd2..45093bbde78 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHBaseAdminNoCluster.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHBaseAdminNoCluster.java @@ -35,6 +35,8 @@ import org.apache.hadoop.hbase.MasterNotRunningException; import org.apache.hadoop.hbase.PleaseHoldException; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.ZooKeeperConnectionException; +import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController; +import org.apache.hadoop.hbase.ipc.RpcControllerFactory; import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.BalanceRequest; import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.CreateTableRequest; import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DispatchMergingRegionsRequest; @@ -310,6 +312,14 @@ public class TestHBaseAdminNoCluster { } }); Mockito.when(connection.getKeepAliveMasterService()).thenReturn(masterAdmin); + RpcControllerFactory rpcControllerFactory = Mockito.mock(RpcControllerFactory.class); + Mockito.when(connection.getRpcControllerFactory()).thenReturn(rpcControllerFactory); + Mockito.when(rpcControllerFactory.newController()).thenReturn( + Mockito.mock(PayloadCarryingRpcController.class)); + + // we need a real retrying caller + RpcRetryingCallerFactory callerFactory = new RpcRetryingCallerFactory(configuration); + Mockito.when(connection.getRpcRetryingCallerFactory()).thenReturn(callerFactory); Admin admin = null; try { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScannersFromClientSide.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScannersFromClientSide.java index 77771ba1cf8..d4d319a9956 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScannersFromClientSide.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScannersFromClientSide.java @@ -285,7 +285,7 @@ public class TestScannersFromClientSide { private void verifyExpectedCounts(Table table, Scan scan, int expectedRowCount, int expectedCellCount) throws Exception { ResultScanner scanner = table.getScanner(scan); - + int rowCount = 0; int cellCount = 0; Result r = null; @@ -609,7 +609,7 @@ public class TestScannersFromClientSide { byte[] regionName = hri.getRegionName(); int i = cluster.getServerWith(regionName); HRegionServer rs = cluster.getRegionServer(i); - ProtobufUtil.closeRegion( + ProtobufUtil.closeRegion(null, rs.getRSRpcServices(), rs.getServerName(), regionName); long startTime = EnvironmentEdgeManager.currentTime(); long timeOut = 300000; @@ -627,7 +627,7 @@ public class TestScannersFromClientSide { RegionStates states = master.getAssignmentManager().getRegionStates(); states.regionOffline(hri); states.updateRegionState(hri, State.OPENING); - ProtobufUtil.openRegion(rs.getRSRpcServices(), rs.getServerName(), hri); + ProtobufUtil.openRegion(null, rs.getRSRpcServices(), rs.getServerName(), hri); startTime = EnvironmentEdgeManager.currentTime(); while (true) { if (rs.getOnlineRegion(regionName) != null) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/DelegatingRpcScheduler.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/DelegatingRpcScheduler.java new file mode 100644 index 00000000000..b1b3b23e1a7 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/DelegatingRpcScheduler.java @@ -0,0 +1,76 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.ipc; + +import java.io.IOException; + +public class DelegatingRpcScheduler extends RpcScheduler { + protected RpcScheduler delegate; + + public DelegatingRpcScheduler(RpcScheduler delegate) { + this.delegate = delegate; + } + + @Override + public void stop() { + delegate.stop(); + } + @Override + public void start() { + delegate.start(); + } + @Override + public void init(Context context) { + delegate.init(context); + } + @Override + public int getReplicationQueueLength() { + return delegate.getReplicationQueueLength(); + } + + @Override + public int getPriorityQueueLength() { + return delegate.getPriorityQueueLength(); + } + + @Override + public int getGeneralQueueLength() { + return delegate.getGeneralQueueLength(); + } + + @Override + public int getActiveRpcHandlerCount() { + return delegate.getActiveRpcHandlerCount(); + } + + @Override + public boolean dispatch(CallRunner task) throws IOException, InterruptedException { + return delegate.dispatch(task); + } + + @Override + public long getNumGeneralCallsDropped() { + return delegate.getNumGeneralCallsDropped(); + } + + @Override + public long getNumLifoModeSwitches() { + return delegate.getNumLifoModeSwitches(); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFilesSplitRecovery.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFilesSplitRecovery.java index 052e05c4e4c..32e3058bbf3 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFilesSplitRecovery.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFilesSplitRecovery.java @@ -196,7 +196,7 @@ public class TestLoadIncrementalHFilesSplitRecovery { ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices())) { if (hri.getTable().equals(table)) { // splitRegion doesn't work if startkey/endkey are null - ProtobufUtil.split(hrs.getRSRpcServices(), hri, rowkey(ROWCOUNT / 2)); // hard code split + ProtobufUtil.split(null, hrs.getRSRpcServices(), hri, rowkey(ROWCOUNT / 2)); } } @@ -480,6 +480,7 @@ public class TestLoadIncrementalHFilesSplitRecovery { final AtomicInteger countedLqis = new AtomicInteger(); LoadIncrementalHFiles loader = new LoadIncrementalHFiles(util.getConfiguration()) { + @Override protected List groupOrSplit( Multimap regionGroups, final LoadQueueItem item, final Table htable, diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestClockSkewDetection.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestClockSkewDetection.java index 142437c4d39..b2ffc3e1d81 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestClockSkewDetection.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestClockSkewDetection.java @@ -19,6 +19,8 @@ package org.apache.hadoop.hbase.master; import static org.junit.Assert.fail; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; import java.net.InetAddress; @@ -32,6 +34,7 @@ import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.client.ClusterConnection; +import org.apache.hadoop.hbase.ipc.RpcControllerFactory; import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerStartupRequest; import org.apache.hadoop.hbase.testclassification.MasterTests; import org.apache.hadoop.hbase.testclassification.SmallTests; @@ -81,7 +84,7 @@ public class TestClockSkewDetection { @Override public void abort(String why, Throwable e) {} - + @Override public boolean isAborted() { return false; @@ -103,10 +106,11 @@ public class TestClockSkewDetection { @Override public ClusterConnection getClusterConnection() { - // TODO Auto-generated method stub - return null; + ClusterConnection conn = mock(ClusterConnection.class); + when(conn.getRpcControllerFactory()).thenReturn(mock(RpcControllerFactory.class)); + return conn; } - }, null, false); + }, null, true); LOG.debug("regionServerStartup 1"); InetAddress ia1 = InetAddress.getLocalHost(); @@ -135,7 +139,7 @@ public class TestClockSkewDetection { //we want an exception LOG.info("Recieved expected exception: "+e); } - + try { // Master Time < Region Server Time LOG.debug("Test: Master Time < Region Server Time"); @@ -151,7 +155,7 @@ public class TestClockSkewDetection { // we want an exception LOG.info("Recieved expected exception: " + e); } - + // make sure values above warning threshold but below max threshold don't kill LOG.debug("regionServerStartup 4"); InetAddress ia4 = InetAddress.getLocalHost(); @@ -160,7 +164,7 @@ public class TestClockSkewDetection { request.setServerStartCode(-1); request.setServerCurrentTime(System.currentTimeMillis() - warningSkew * 2); sm.regionServerStartup(request.build(), ia4); - + // make sure values above warning threshold but below max threshold don't kill LOG.debug("regionServerStartup 5"); InetAddress ia5 = InetAddress.getLocalHost(); @@ -169,7 +173,7 @@ public class TestClockSkewDetection { request.setServerStartCode(-1); request.setServerCurrentTime(System.currentTimeMillis() + warningSkew * 2); sm.regionServerStartup(request.build(), ia5); - + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerNoMaster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerNoMaster.java index 03b43f1bfd1..0ee75a87336 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerNoMaster.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerNoMaster.java @@ -114,7 +114,7 @@ public class TestRegionServerNoMaster { return; } - ProtobufUtil.openRegion(hrs.getRSRpcServices(), + ProtobufUtil.openRegion(null, hrs.getRSRpcServices(), hrs.getServerName(), HRegionInfo.FIRST_META_REGIONINFO); while (true) { sn = mtl.getMetaRegionLocation(zkw); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/SecureTestUtil.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/SecureTestUtil.java index cbc1a905f30..22a9748eee2 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/SecureTestUtil.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/SecureTestUtil.java @@ -372,7 +372,7 @@ public class SecureTestUtil { BlockingRpcChannel service = acl.coprocessorService(HConstants.EMPTY_START_ROW); AccessControlService.BlockingInterface protocol = AccessControlService.newBlockingStub(service); - ProtobufUtil.grant(protocol, user, actions); + ProtobufUtil.grant(null, protocol, user, actions); } } return null; @@ -395,7 +395,7 @@ public class SecureTestUtil { BlockingRpcChannel service = acl.coprocessorService(HConstants.EMPTY_START_ROW); AccessControlService.BlockingInterface protocol = AccessControlService.newBlockingStub(service); - ProtobufUtil.revoke(protocol, user, actions); + ProtobufUtil.revoke(null, protocol, user, actions); } } return null; @@ -418,7 +418,7 @@ public class SecureTestUtil { BlockingRpcChannel service = acl.coprocessorService(HConstants.EMPTY_START_ROW); AccessControlService.BlockingInterface protocol = AccessControlService.newBlockingStub(service); - ProtobufUtil.grant(protocol, user, namespace, actions); + ProtobufUtil.grant(null, protocol, user, namespace, actions); } } return null; @@ -483,7 +483,7 @@ public class SecureTestUtil { BlockingRpcChannel service = acl.coprocessorService(HConstants.EMPTY_START_ROW); AccessControlService.BlockingInterface protocol = AccessControlService.newBlockingStub(service); - ProtobufUtil.revoke(protocol, user, namespace, actions); + ProtobufUtil.revoke(null, protocol, user, namespace, actions); } } return null; @@ -507,7 +507,7 @@ public class SecureTestUtil { BlockingRpcChannel service = acl.coprocessorService(HConstants.EMPTY_START_ROW); AccessControlService.BlockingInterface protocol = AccessControlService.newBlockingStub(service); - ProtobufUtil.grant(protocol, user, table, family, qualifier, actions); + ProtobufUtil.grant(null, protocol, user, table, family, qualifier, actions); } } return null; @@ -573,7 +573,7 @@ public class SecureTestUtil { BlockingRpcChannel service = acl.coprocessorService(HConstants.EMPTY_START_ROW); AccessControlService.BlockingInterface protocol = AccessControlService.newBlockingStub(service); - ProtobufUtil.revoke(protocol, user, table, family, qualifier, actions); + ProtobufUtil.revoke(null, protocol, user, table, family, qualifier, actions); } } return null; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestAccessController.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestAccessController.java index 7b1454d55ae..37c42a04eeb 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestAccessController.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestAccessController.java @@ -1165,7 +1165,7 @@ public class TestAccessController extends SecureTestUtil { BlockingRpcChannel service = acl.coprocessorService(TEST_TABLE.getName()); AccessControlService.BlockingInterface protocol = AccessControlService.newBlockingStub(service); - ProtobufUtil.grant(protocol, USER_RO.getShortName(), TEST_TABLE, TEST_FAMILY, null, + ProtobufUtil.grant(null, protocol, USER_RO.getShortName(), TEST_TABLE, TEST_FAMILY, null, Action.READ); } return null; @@ -1180,7 +1180,7 @@ public class TestAccessController extends SecureTestUtil { BlockingRpcChannel service = acl.coprocessorService(TEST_TABLE.getName()); AccessControlService.BlockingInterface protocol = AccessControlService.newBlockingStub(service); - ProtobufUtil.revoke(protocol, USER_RO.getShortName(), TEST_TABLE, TEST_FAMILY, null, + ProtobufUtil.revoke(null, protocol, USER_RO.getShortName(), TEST_TABLE, TEST_FAMILY, null, Action.READ); } return null; @@ -1195,7 +1195,7 @@ public class TestAccessController extends SecureTestUtil { BlockingRpcChannel service = acl.coprocessorService(TEST_TABLE.getName()); AccessControlService.BlockingInterface protocol = AccessControlService.newBlockingStub(service); - ProtobufUtil.getUserPermissions(protocol, TEST_TABLE); + ProtobufUtil.getUserPermissions(null, protocol, TEST_TABLE); } return null; } @@ -1209,7 +1209,7 @@ public class TestAccessController extends SecureTestUtil { BlockingRpcChannel service = acl.coprocessorService(HConstants.EMPTY_START_ROW); AccessControlService.BlockingInterface protocol = AccessControlService.newBlockingStub(service); - ProtobufUtil.getUserPermissions(protocol); + ProtobufUtil.getUserPermissions(null, protocol); } return null; } @@ -1620,7 +1620,7 @@ public class TestAccessController extends SecureTestUtil { BlockingRpcChannel service = acl.coprocessorService(tableName.getName()); AccessControlService.BlockingInterface protocol = AccessControlService.newBlockingStub(service); - perms = ProtobufUtil.getUserPermissions(protocol, tableName); + perms = ProtobufUtil.getUserPermissions(null, protocol, tableName); } finally { acl.close(); } @@ -1647,7 +1647,7 @@ public class TestAccessController extends SecureTestUtil { BlockingRpcChannel service = acl.coprocessorService(tableName.getName()); AccessControlService.BlockingInterface protocol = AccessControlService.newBlockingStub(service); - perms = ProtobufUtil.getUserPermissions(protocol, tableName); + perms = ProtobufUtil.getUserPermissions(null, protocol, tableName); } finally { acl.close(); } @@ -1671,7 +1671,7 @@ public class TestAccessController extends SecureTestUtil { BlockingRpcChannel service = acl.coprocessorService(tableName.getName()); AccessControlService.BlockingInterface protocol = AccessControlService.newBlockingStub(service); - perms = ProtobufUtil.getUserPermissions(protocol, tableName); + perms = ProtobufUtil.getUserPermissions(null, protocol, tableName); } finally { acl.close(); } @@ -1691,7 +1691,7 @@ public class TestAccessController extends SecureTestUtil { BlockingRpcChannel service = acl.coprocessorService(tableName.getName()); AccessControlService.BlockingInterface protocol = AccessControlService.newBlockingStub(service); - perms = ProtobufUtil.getUserPermissions(protocol, tableName); + perms = ProtobufUtil.getUserPermissions(null, protocol, tableName); } finally { acl.close(); } @@ -1711,7 +1711,7 @@ public class TestAccessController extends SecureTestUtil { BlockingRpcChannel service = acl.coprocessorService(tableName.getName()); AccessControlService.BlockingInterface protocol = AccessControlService.newBlockingStub(service); - perms = ProtobufUtil.getUserPermissions(protocol, tableName); + perms = ProtobufUtil.getUserPermissions(null, protocol, tableName); } finally { acl.close(); } @@ -1734,7 +1734,7 @@ public class TestAccessController extends SecureTestUtil { BlockingRpcChannel service = acl.coprocessorService(HConstants.EMPTY_START_ROW); AccessControlService.BlockingInterface protocol = AccessControlService.newBlockingStub(service); - perms = ProtobufUtil.getUserPermissions(protocol); + perms = ProtobufUtil.getUserPermissions(null, protocol); } finally { acl.close(); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestNamespaceCommands.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestNamespaceCommands.java index 96cd2991db7..d5834fd9413 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestNamespaceCommands.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestNamespaceCommands.java @@ -233,6 +233,7 @@ public class TestNamespaceCommands extends SecureTestUtil { @Test public void testModifyNamespace() throws Exception { AccessTestAction modifyNamespace = new AccessTestAction() { + @Override public Object run() throws Exception { ACCESS_CONTROLLER.preModifyNamespace(ObserverContext.createAndPrepare(CP_ENV, null), NamespaceDescriptor.create(TEST_NAMESPACE).addConfiguration("abc", "156").build()); @@ -359,7 +360,7 @@ public class TestNamespaceCommands extends SecureTestUtil { acl.coprocessorService(HConstants.EMPTY_START_ROW); AccessControlService.BlockingInterface protocol = AccessControlService.newBlockingStub(service); - ProtobufUtil.grant(protocol, testUser, TEST_NAMESPACE, Action.WRITE); + ProtobufUtil.grant(null, protocol, testUser, TEST_NAMESPACE, Action.WRITE); } finally { acl.close(); connection.close(); @@ -376,7 +377,7 @@ public class TestNamespaceCommands extends SecureTestUtil { BlockingRpcChannel service = acl.coprocessorService(HConstants.EMPTY_START_ROW); AccessControlService.BlockingInterface protocol = AccessControlService.newBlockingStub(service); - ProtobufUtil.grant(protocol, USER_GROUP_NS_ADMIN.getShortName(), + ProtobufUtil.grant(null, protocol, USER_GROUP_NS_ADMIN.getShortName(), TEST_NAMESPACE, Action.READ); } return null; @@ -384,6 +385,7 @@ public class TestNamespaceCommands extends SecureTestUtil { }; AccessTestAction revokeAction = new AccessTestAction() { + @Override public Object run() throws Exception { Connection connection = ConnectionFactory.createConnection(conf); Table acl = connection.getTable(AccessControlLists.ACL_TABLE_NAME); @@ -392,7 +394,7 @@ public class TestNamespaceCommands extends SecureTestUtil { acl.coprocessorService(HConstants.EMPTY_START_ROW); AccessControlService.BlockingInterface protocol = AccessControlService.newBlockingStub(service); - ProtobufUtil.revoke(protocol, testUser, TEST_NAMESPACE, Action.WRITE); + ProtobufUtil.revoke(null, protocol, testUser, TEST_NAMESPACE, Action.WRITE); } finally { acl.close(); connection.close(); @@ -402,6 +404,7 @@ public class TestNamespaceCommands extends SecureTestUtil { }; AccessTestAction revokeNamespaceAction = new AccessTestAction() { + @Override public Object run() throws Exception { Connection connection = ConnectionFactory.createConnection(conf); Table acl = connection.getTable(AccessControlLists.ACL_TABLE_NAME); @@ -410,7 +413,7 @@ public class TestNamespaceCommands extends SecureTestUtil { acl.coprocessorService(HConstants.EMPTY_START_ROW); AccessControlService.BlockingInterface protocol = AccessControlService.newBlockingStub(service); - ProtobufUtil.revoke(protocol, USER_GROUP_NS_ADMIN.getShortName(), + ProtobufUtil.revoke(null, protocol, USER_GROUP_NS_ADMIN.getShortName(), TEST_NAMESPACE, Action.READ); } finally { acl.close(); @@ -429,7 +432,7 @@ public class TestNamespaceCommands extends SecureTestUtil { BlockingRpcChannel service = acl.coprocessorService(HConstants.EMPTY_START_ROW); AccessControlService.BlockingInterface protocol = AccessControlService.newBlockingStub(service); - ProtobufUtil.getUserPermissions(protocol, Bytes.toBytes(TEST_NAMESPACE)); + ProtobufUtil.getUserPermissions(null, protocol, Bytes.toBytes(TEST_NAMESPACE)); } finally { acl.close(); connection.close();