From c61cb7fb55124547a36a6ef56afaec43676039f8 Mon Sep 17 00:00:00 2001 From: Jesse Yates Date: Thu, 22 May 2014 23:26:54 -0700 Subject: [PATCH] HBASE-11048 Support setting custom priority per client RPC --- .../hadoop/hbase/client/AsyncProcess.java | 8 +- .../hadoop/hbase/client/ClientScanner.java | 46 ++-- .../client/ClientSmallReversedScanner.java | 2 +- .../hbase/client/ClientSmallScanner.java | 28 ++- .../hbase/client/ConnectionManager.java | 5 +- .../apache/hadoop/hbase/client/HTable.java | 41 ++-- .../hbase/client/MultiServerCallable.java | 9 +- .../hbase/client/ReversedClientScanner.java | 5 +- .../hbase/client/ReversedScannerCallable.java | 20 +- .../hadoop/hbase/client/ScannerCallable.java | 16 +- ...elegatingPayloadCarryingRpcController.java | 58 +++++ .../hbase/ipc/RpcControllerFactory.java | 59 +++++ .../hadoop/hbase/client/TestAsyncProcess.java | 5 +- .../regionserver/wal/WALEditsReplaySink.java | 5 +- .../client/HConnectionTestingUtility.java | 6 +- .../client/TestRpcControllerFactory.java | 203 ++++++++++++++++++ 16 files changed, 450 insertions(+), 66 deletions(-) create mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/DelegatingPayloadCarryingRpcController.java create mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcControllerFactory.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRpcControllerFactory.java diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java index b31e5e35d44..433322fc397 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java @@ -46,6 +46,7 @@ import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.coprocessor.Batch; +import org.apache.hadoop.hbase.ipc.RpcControllerFactory; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.htrace.Trace; @@ -120,6 +121,7 @@ class AsyncProcess { protected final ClusterConnection hConnection; protected final RpcRetryingCallerFactory rpcCallerFactory; + protected final RpcControllerFactory rpcFactory; protected final BatchErrors globalErrors; protected final ExecutorService pool; @@ -188,7 +190,7 @@ class AsyncProcess { } public AsyncProcess(ClusterConnection hc, Configuration conf, ExecutorService pool, - RpcRetryingCallerFactory rpcCaller, boolean useGlobalErrors) { + RpcRetryingCallerFactory rpcCaller, boolean useGlobalErrors, RpcControllerFactory rpcFactory) { if (hc == null) { throw new IllegalArgumentException("HConnection cannot be null."); } @@ -242,8 +244,8 @@ class AsyncProcess { serverTrackerTimeout += ConnectionUtils.getPauseTime(this.pause, i); } - this.rpcCallerFactory = rpcCaller; + this.rpcFactory = rpcFactory; } private ExecutorService getPool(ExecutorService pool) { @@ -950,7 +952,7 @@ class AsyncProcess { @VisibleForTesting protected MultiServerCallable createCallable(final ServerName server, TableName tableName, final MultiAction multi) { - return new MultiServerCallable(hConnection, tableName, server, multi); + return new MultiServerCallable(hConnection, tableName, server, this.rpcFactory, multi); } /** diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java index 574d937b46d..05bde7076d5 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java @@ -35,6 +35,7 @@ import org.apache.hadoop.hbase.NotServingRegionException; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.UnknownScannerException; import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException; +import org.apache.hadoop.hbase.ipc.RpcControllerFactory; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos; import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException; @@ -66,6 +67,7 @@ public class ClientScanner extends AbstractClientScanner { protected final int scannerTimeout; protected boolean scanMetricsPublished = false; protected RpcRetryingCaller caller; + protected RpcControllerFactory rpcControllerFactory; /** * Create a new ClientScanner for the specified table. An HConnection will be @@ -93,27 +95,36 @@ public class ClientScanner extends AbstractClientScanner { /** - * Create a new ClientScanner for the specified table - * Note that the passed {@link Scan}'s start row maybe changed changed. - * - * @param conf The {@link Configuration} to use. - * @param scan {@link Scan} to use in this scanner - * @param tableName The table that we wish to scan - * @param connection Connection identifying the cluster - * @throws IOException - */ + * @deprecated use + * {@link #ClientScanner(Configuration, Scan, TableName, HConnection, RpcRetryingCallerFactory, RpcControllerFactory)} + */ + @Deprecated public ClientScanner(final Configuration conf, final Scan scan, final TableName tableName, HConnection connection) throws IOException { - this(conf, scan, tableName, connection, new RpcRetryingCallerFactory(conf)); + this(conf, scan, tableName, connection, new RpcRetryingCallerFactory(conf), + RpcControllerFactory.instantiate(conf)); } /** - * @deprecated Use {@link #ClientScanner(Configuration, Scan, TableName, HConnection)} + * @deprecated Use + * {@link #ClientScanner(Configuration, Scan, TableName, HConnection, RpcRetryingCallerFactory, RpcControllerFactory)} */ @Deprecated public ClientScanner(final Configuration conf, final Scan scan, final byte [] tableName, HConnection connection) throws IOException { - this(conf, scan, TableName.valueOf(tableName), connection, new RpcRetryingCallerFactory(conf)); + this(conf, scan, TableName.valueOf(tableName), connection, new RpcRetryingCallerFactory(conf), + RpcControllerFactory.instantiate(conf)); + } + + /** + * @deprecated Use + * {@link #ClientScanner(Configuration, Scan, TableName, HConnection, RpcRetryingCallerFactory, RpcControllerFactory) + * instead. + */ + @Deprecated + public ClientScanner(final Configuration conf, final Scan scan, final TableName tableName, + HConnection connection, RpcRetryingCallerFactory rpcFactory) throws IOException { + this(conf, scan, tableName, connection, rpcFactory, RpcControllerFactory.instantiate(conf)); } /** @@ -126,7 +137,8 @@ public class ClientScanner extends AbstractClientScanner { * @throws IOException */ public ClientScanner(final Configuration conf, final Scan scan, final TableName tableName, - HConnection connection, RpcRetryingCallerFactory rpcFactory) throws IOException { + HConnection connection, RpcRetryingCallerFactory rpcFactory, + RpcControllerFactory controllerFactory) throws IOException { if (LOG.isTraceEnabled()) { LOG.trace("Scan table=" + tableName + ", startRow=" + Bytes.toStringBinary(scan.getStartRow())); @@ -159,7 +171,8 @@ public class ClientScanner extends AbstractClientScanner { HConstants.DEFAULT_HBASE_CLIENT_SCANNER_CACHING); } - this.caller = rpcFactory. newCaller(); + this.caller = rpcFactory. newCaller(); + this.rpcControllerFactory = controllerFactory; initializeScannerInConstruction(); } @@ -277,8 +290,9 @@ public class ClientScanner extends AbstractClientScanner { protected ScannerCallable getScannerCallable(byte [] localStartKey, int nbRows) { scan.setStartRow(localStartKey); - ScannerCallable s = new ScannerCallable(getConnection(), - getTable(), scan, this.scanMetrics); + ScannerCallable s = + new ScannerCallable(getConnection(), getTable(), scan, this.scanMetrics, + this.rpcControllerFactory); s.setCaching(nbRows); return s; } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallReversedScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallReversedScanner.java index 17b11109c09..c707e459211 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallReversedScanner.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallReversedScanner.java @@ -109,7 +109,7 @@ public class ClientSmallReversedScanner extends ReversedClientScanner { } smallScanCallable = ClientSmallScanner.getSmallScanCallable( - scan, getConnection(), getTable(), localStartKey, cacheNum); + scan, getConnection(), getTable(), localStartKey, cacheNum, this.rpcControllerFactory); if (this.scanMetrics != null && skipRowOfFirstResult == null) { this.scanMetrics.countOfRegions.incrementAndGet(); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallScanner.java index dd20f0a4984..1297ee4e111 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallScanner.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallScanner.java @@ -33,6 +33,7 @@ import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.KeyValueUtil; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController; +import org.apache.hadoop.hbase.ipc.RpcControllerFactory; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.RequestConverter; import org.apache.hadoop.hbase.protobuf.ResponseConverter; @@ -85,9 +86,20 @@ public class ClientSmallScanner extends ClientScanner { */ public ClientSmallScanner(final Configuration conf, final Scan scan, final TableName tableName, HConnection connection) throws IOException { - this(conf, scan, tableName, connection, new RpcRetryingCallerFactory(conf)); + this(conf, scan, tableName, connection, new RpcRetryingCallerFactory(conf), + new RpcControllerFactory(conf)); } + /** + * @deprecated use + * {@link #ClientSmallScanner(Configuration, Scan, TableName, HConnection, RpcRetryingCallerFactory, RpcControllerFactory) + * instead + */ + public ClientSmallScanner(final Configuration conf, final Scan scan, final TableName tableName, + HConnection connection, RpcRetryingCallerFactory rpcFactory) throws IOException { + this(conf, scan, tableName, connection, rpcFactory, RpcControllerFactory.instantiate(conf)); + } + /** * Create a new ShortClientScanner for the specified table Note that the * passed {@link Scan}'s start row maybe changed changed. @@ -99,10 +111,10 @@ public class ClientSmallScanner extends ClientScanner { * @param rpcFactory * @throws IOException */ - public ClientSmallScanner(final Configuration conf, final Scan scan, - final TableName tableName, HConnection connection, - RpcRetryingCallerFactory rpcFactory) throws IOException { - super(conf, scan, tableName, connection, rpcFactory); + public ClientSmallScanner(final Configuration conf, final Scan scan, final TableName tableName, + HConnection connection, RpcRetryingCallerFactory rpcFactory, + RpcControllerFactory controllerFactory) throws IOException { + super(conf, scan, tableName, connection, rpcFactory, controllerFactory); } @Override @@ -154,7 +166,7 @@ public class ClientSmallScanner extends ClientScanner { + Bytes.toStringBinary(localStartKey) + "'"); } smallScanCallable = getSmallScanCallable( - scan, getConnection(), getTable(), localStartKey, cacheNum); + scan, getConnection(), getTable(), localStartKey, cacheNum, rpcControllerFactory); if (this.scanMetrics != null && skipRowOfFirstResult == null) { this.scanMetrics.countOfRegions.incrementAndGet(); } @@ -163,14 +175,14 @@ public class ClientSmallScanner extends ClientScanner { static RegionServerCallable getSmallScanCallable( final Scan sc, HConnection connection, TableName table, - byte[] localStartKey, final int cacheNum) { + byte[] localStartKey, final int cacheNum, final RpcControllerFactory rpcControllerFactory) { sc.setStartRow(localStartKey); RegionServerCallable callable = new RegionServerCallable( connection, table, sc.getStartRow()) { public Result[] call(int callTimeout) throws IOException { ScanRequest request = RequestConverter.buildScanRequest(getLocation() .getRegionInfo().getRegionName(), sc, cacheNum, true); - PayloadCarryingRpcController controller = new PayloadCarryingRpcController(); + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); controller.setPriority(getTableName()); controller.setCallTimeout(callTimeout); try { diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java index a1848dd4abe..53d6690adcb 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java @@ -68,6 +68,7 @@ import org.apache.hadoop.hbase.client.coprocessor.Batch; import org.apache.hadoop.hbase.exceptions.RegionMovedException; import org.apache.hadoop.hbase.exceptions.RegionOpeningException; import org.apache.hadoop.hbase.ipc.RpcClient; +import org.apache.hadoop.hbase.ipc.RpcControllerFactory; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.RequestConverter; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService; @@ -2200,8 +2201,8 @@ class ConnectionManager { // For tests to override. protected AsyncProcess createAsyncProcess(Configuration conf) { // No default pool available. - return new AsyncProcess( - this, conf, this.batchPool, RpcRetryingCallerFactory.instantiate(conf), false); + return new AsyncProcess(this, conf, this.batchPool, + RpcRetryingCallerFactory.instantiate(conf), false, RpcControllerFactory.instantiate(conf)); } @Override diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java index 49d9354a533..13025b28f31 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java @@ -61,6 +61,7 @@ import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel; import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController; import org.apache.hadoop.hbase.ipc.RegionCoprocessorRpcChannel; +import org.apache.hadoop.hbase.ipc.RpcControllerFactory; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.RequestConverter; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos; @@ -146,6 +147,7 @@ public class HTable implements HTableInterface { /** The Async process for batch */ protected AsyncProcess multiAp; private RpcRetryingCallerFactory rpcCallerFactory; + private RpcControllerFactory rpcControllerFactory; /** * Creates an object to access a HBase table. @@ -362,8 +364,9 @@ public class HTable implements HTableInterface { HConstants.DEFAULT_HBASE_CLIENT_SCANNER_CACHING); this.rpcCallerFactory = RpcRetryingCallerFactory.instantiate(configuration); + this.rpcControllerFactory = RpcControllerFactory.instantiate(configuration); // puts need to track errors globally due to how the APIs currently work. - ap = new AsyncProcess(connection, configuration, pool, rpcCallerFactory, true); + ap = new AsyncProcess(connection, configuration, pool, rpcCallerFactory, true, rpcControllerFactory); multiAp = this.connection.getAsyncProcess(); this.maxKeyValueSize = this.configuration.getInt( @@ -725,7 +728,7 @@ public class HTable implements HTableInterface { RegionServerCallable callable = new RegionServerCallable(this.connection, tableName, row) { public Result call(int callTimeout) throws IOException { - PayloadCarryingRpcController controller = new PayloadCarryingRpcController(); + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); controller.setPriority(tableName); controller.setCallTimeout(callTimeout); ClientProtos.GetRequest request = RequestConverter.buildGetRowOrBeforeRequest( @@ -763,10 +766,10 @@ public class HTable implements HTableInterface { if (scan.isSmall()) { return new ClientSmallScanner(getConfiguration(), scan, getName(), - this.connection, this.rpcCallerFactory); + this.connection, this.rpcCallerFactory, this.rpcControllerFactory); } else { - return new ClientScanner(getConfiguration(), scan, - getName(), this.connection); + return new ClientScanner(getConfiguration(), scan, getName(), this.connection, + this.rpcCallerFactory, this.rpcControllerFactory); } } @@ -801,7 +804,7 @@ public class HTable implements HTableInterface { public Result call(int callTimeout) throws IOException { ClientProtos.GetRequest request = RequestConverter.buildGetRequest(getLocation().getRegionInfo().getRegionName(), get); - PayloadCarryingRpcController controller = new PayloadCarryingRpcController(); + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); controller.setPriority(tableName); controller.setCallTimeout(callTimeout); try { @@ -902,7 +905,7 @@ public class HTable implements HTableInterface { RegionServerCallable callable = new RegionServerCallable(connection, tableName, delete.getRow()) { public Boolean call(int callTimeout) throws IOException { - PayloadCarryingRpcController controller = new PayloadCarryingRpcController(); + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); controller.setPriority(tableName); controller.setCallTimeout(callTimeout); @@ -1042,7 +1045,7 @@ public class HTable implements HTableInterface { RegionServerCallable callable = new RegionServerCallable(connection, getName(), rm.getRow()) { public Void call(int callTimeout) throws IOException { - PayloadCarryingRpcController controller = new PayloadCarryingRpcController(); + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); controller.setPriority(tableName); controller.setCallTimeout(callTimeout); try { @@ -1076,7 +1079,7 @@ public class HTable implements HTableInterface { RegionServerCallable callable = new RegionServerCallable(this.connection, getName(), append.getRow()) { public Result call(int callTimeout) throws IOException { - PayloadCarryingRpcController controller = new PayloadCarryingRpcController(); + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); controller.setPriority(getTableName()); controller.setCallTimeout(callTimeout); try { @@ -1107,7 +1110,7 @@ public class HTable implements HTableInterface { RegionServerCallable callable = new RegionServerCallable(this.connection, getName(), increment.getRow()) { public Result call(int callTimeout) throws IOException { - PayloadCarryingRpcController controller = new PayloadCarryingRpcController(); + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); controller.setPriority(getTableName()); controller.setCallTimeout(callTimeout); try { @@ -1118,8 +1121,8 @@ public class HTable implements HTableInterface { } catch (ServiceException se) { throw ProtobufUtil.getRemoteException(se); } - } - }; + } + }; return rpcCallerFactory. newCaller().callWithRetries(callable, this.operationTimeout); } @@ -1170,7 +1173,7 @@ public class HTable implements HTableInterface { RegionServerCallable callable = new RegionServerCallable(connection, getName(), row) { public Long call(int callTimeout) throws IOException { - PayloadCarryingRpcController controller = new PayloadCarryingRpcController(); + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); controller.setPriority(getTableName()); controller.setCallTimeout(callTimeout); try { @@ -1200,7 +1203,7 @@ public class HTable implements HTableInterface { RegionServerCallable callable = new RegionServerCallable(connection, getName(), row) { public Boolean call(int callTimeout) throws IOException { - PayloadCarryingRpcController controller = new PayloadCarryingRpcController(); + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); controller.setPriority(tableName); controller.setCallTimeout(callTimeout); try { @@ -1257,7 +1260,7 @@ public class HTable implements HTableInterface { RegionServerCallable callable = new RegionServerCallable(connection, getName(), row) { public Boolean call(int callTimeout) throws IOException { - PayloadCarryingRpcController controller = new PayloadCarryingRpcController(); + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); controller.setPriority(tableName); controller.setCallTimeout(callTimeout); try { @@ -1285,7 +1288,7 @@ public class HTable implements HTableInterface { RegionServerCallable callable = new RegionServerCallable(connection, getName(), row) { public Boolean call(int callTimeout) throws IOException { - PayloadCarryingRpcController controller = new PayloadCarryingRpcController(); + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); controller.setPriority(tableName); controller.setCallTimeout(callTimeout); try { @@ -1761,8 +1764,10 @@ public class HTable implements HTableInterface { final List callbackErrorServers = new ArrayList(); Object[] results = new Object[execs.size()]; - AsyncProcess asyncProcess = new AsyncProcess(connection, configuration, pool, - RpcRetryingCallerFactory.instantiate(configuration), true); + AsyncProcess asyncProcess = + new AsyncProcess(connection, configuration, pool, + RpcRetryingCallerFactory.instantiate(configuration), true, + RpcControllerFactory.instantiate(configuration)); AsyncRequestFuture future = asyncProcess.submitAll(tableName, execs, new Callback() { @Override diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java index 47d8d545b5f..cc26ecf5dd5 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java @@ -31,14 +31,15 @@ import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController; +import org.apache.hadoop.hbase.ipc.RpcControllerFactory; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.RequestConverter; import org.apache.hadoop.hbase.protobuf.ResponseConverter; -import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionAction; +import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos; import com.google.protobuf.ServiceException; @@ -51,10 +52,12 @@ import com.google.protobuf.ServiceException; class MultiServerCallable extends RegionServerCallable { private final MultiAction multiAction; private final boolean cellBlock; + private RpcControllerFactory rpcFactory; MultiServerCallable(final HConnection connection, final TableName tableName, - final ServerName location, final MultiAction multi) { + final ServerName location, RpcControllerFactory rpcFactory, final MultiAction multi) { super(connection, tableName, null); + this.rpcFactory = rpcFactory; this.multiAction = multi; // RegionServerCallable has HRegionLocation field, but this is a multi-region request. // Using region info from parent HRegionLocation would be a mistake for this class; so @@ -115,7 +118,7 @@ class MultiServerCallable extends RegionServerCallable { // Controller optionally carries cell data over the proxy/service boundary and also // optionally ferries cell response data back out again. - PayloadCarryingRpcController controller = new PayloadCarryingRpcController(cells); + PayloadCarryingRpcController controller = rpcFactory.newController(cells); controller.setPriority(getTableName()); controller.setCallTimeout(callTimeout); ClientProtos.MultiResponse responseProto; diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedClientScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedClientScanner.java index d6e17ae8f55..727eeca53f9 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedClientScanner.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedClientScanner.java @@ -125,8 +125,9 @@ public class ReversedClientScanner extends ClientScanner { protected ScannerCallable getScannerCallable(byte[] localStartKey, int nbRows, byte[] locateStartRow) { scan.setStartRow(localStartKey); - ScannerCallable s = new ReversedScannerCallable(getConnection(), - getTable(), scan, this.scanMetrics, locateStartRow); + ScannerCallable s = + new ReversedScannerCallable(getConnection(), getTable(), scan, this.scanMetrics, + locateStartRow, this.rpcControllerFactory); s.setCaching(nbRows); return s; } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedScannerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedScannerCallable.java index a974b01c6f6..f05e3814c87 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedScannerCallable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedScannerCallable.java @@ -29,8 +29,11 @@ import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.metrics.ScanMetrics; +import org.apache.hadoop.hbase.ipc.RpcControllerFactory; import org.apache.hadoop.hbase.util.Bytes; +import com.google.protobuf.RpcController; + /** * A reversed ScannerCallable which supports backward scanning. */ @@ -45,17 +48,28 @@ public class ReversedScannerCallable extends ScannerCallable { protected final byte[] locateStartRow; /** - * * @param connection * @param tableName * @param scan * @param scanMetrics * @param locateStartRow The start row for locating regions + * @param rpcFactory to create an {@link RpcController} to talk to the regionserver */ + public ReversedScannerCallable(HConnection connection, TableName tableName, Scan scan, + ScanMetrics scanMetrics, byte[] locateStartRow, RpcControllerFactory rpcFactory) { + super(connection, tableName, scan, scanMetrics, rpcFactory); + this.locateStartRow = locateStartRow; + } + + /** + * @deprecated use + * {@link #ReversedScannerCallable(HConnection, TableName, Scan, ScanMetrics, byte[], RpcControllerFactory)} + */ + @Deprecated public ReversedScannerCallable(HConnection connection, TableName tableName, Scan scan, ScanMetrics scanMetrics, byte[] locateStartRow) { - super(connection, tableName, scan, scanMetrics); - this.locateStartRow = locateStartRow; + this(connection, tableName, scan, scanMetrics, locateStartRow, RpcControllerFactory + .instantiate(connection.getConfiguration())); } /** diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java index 798352c5117..bb29903bd6b 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java @@ -37,6 +37,7 @@ import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.UnknownScannerException; import org.apache.hadoop.hbase.client.metrics.ScanMetrics; import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController; +import org.apache.hadoop.hbase.ipc.RpcControllerFactory; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.RequestConverter; import org.apache.hadoop.hbase.protobuf.ResponseConverter; @@ -46,6 +47,7 @@ import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException; import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.net.DNS; +import com.google.protobuf.RpcController; import com.google.protobuf.ServiceException; import com.google.protobuf.TextFormat; @@ -81,22 +83,25 @@ public class ScannerCallable extends RegionServerCallable { // indicate if it is a remote server call protected boolean isRegionServerRemote = true; private long nextCallSeq = 0; + private RpcControllerFactory rpcFactory; /** * @param connection which connection * @param tableName table callable is on * @param scan the scan to execute - * @param scanMetrics the ScanMetrics to used, if it is null, ScannerCallable - * won't collect metrics + * @param scanMetrics the ScanMetrics to used, if it is null, ScannerCallable won't collect + * metrics + * @param rpcControllerFactory factory to use when creating {@link RpcController} */ public ScannerCallable (HConnection connection, TableName tableName, Scan scan, - ScanMetrics scanMetrics) { + ScanMetrics scanMetrics, RpcControllerFactory rpcControllerFactory) { super(connection, tableName, scan.getStartRow()); this.scan = scan; this.scanMetrics = scanMetrics; Configuration conf = connection.getConfiguration(); logScannerActivity = conf.getBoolean(LOG_SCANNER_ACTIVITY, false); logCutOffLatency = conf.getInt(LOG_SCANNER_LATENCY_CUTOFF, 1000); + this.rpcFactory = rpcControllerFactory; } /** @@ -105,7 +110,8 @@ public class ScannerCallable extends RegionServerCallable { @Deprecated public ScannerCallable (HConnection connection, final byte [] tableName, Scan scan, ScanMetrics scanMetrics) { - this(connection, TableName.valueOf(tableName), scan, scanMetrics); + this(connection, TableName.valueOf(tableName), scan, scanMetrics, RpcControllerFactory + .instantiate(connection.getConfiguration())); } /** @@ -161,7 +167,7 @@ public class ScannerCallable extends RegionServerCallable { incRPCcallsMetrics(); request = RequestConverter.buildScanRequest(scannerId, caching, false, nextCallSeq); ScanResponse response = null; - PayloadCarryingRpcController controller = new PayloadCarryingRpcController(); + PayloadCarryingRpcController controller = rpcFactory.newController(); controller.setPriority(getTableName()); controller.setCallTimeout(callTimeout); try { diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/DelegatingPayloadCarryingRpcController.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/DelegatingPayloadCarryingRpcController.java new file mode 100644 index 00000000000..9f23770f78e --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/DelegatingPayloadCarryingRpcController.java @@ -0,0 +1,58 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.ipc; + +import org.apache.hadoop.hbase.CellScanner; +import org.apache.hadoop.hbase.TableName; + +/** + * Simple delegating controller for use with the {@link RpcControllerFactory} to help override + * standard behavior of a {@link PayloadCarryingRpcController}. + */ +public class DelegatingPayloadCarryingRpcController extends PayloadCarryingRpcController { + private PayloadCarryingRpcController delegate; + + public DelegatingPayloadCarryingRpcController(PayloadCarryingRpcController delegate) { + this.delegate = delegate; + } + + @Override + public CellScanner cellScanner() { + return delegate.cellScanner(); + } + + @Override + public void setCellScanner(final CellScanner cellScanner) { + delegate.setCellScanner(cellScanner); + } + + @Override + public void setPriority(int priority) { + delegate.setPriority(priority); + } + + @Override + public void setPriority(final TableName tn) { + delegate.setPriority(tn); + } + + @Override + public int getPriority() { + return delegate.getPriority(); + } +} \ No newline at end of file diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcControllerFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcControllerFactory.java new file mode 100644 index 00000000000..2ffab8d98b5 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcControllerFactory.java @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.ipc; + +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.CellScannable; +import org.apache.hadoop.hbase.CellScanner; +import org.apache.hadoop.hbase.util.ReflectionUtils; + +/** + * Factory to create a {@link PayloadCarryingRpcController} + */ +public class RpcControllerFactory { + + public static final String CUSTOM_CONTROLLER_CONF_KEY = "hbase.rpc.controllerfactory.class"; + protected final Configuration conf; + + public RpcControllerFactory(Configuration conf) { + this.conf = conf; + } + + public PayloadCarryingRpcController newController() { + return new PayloadCarryingRpcController(); + } + + public PayloadCarryingRpcController newController(final CellScanner cellScanner) { + return new PayloadCarryingRpcController(cellScanner); + } + + public PayloadCarryingRpcController newController(final List cellIterables) { + return new PayloadCarryingRpcController(cellIterables); + } + + + public static RpcControllerFactory instantiate(Configuration configuration) { + String rpcControllerFactoryClazz = + configuration.get(CUSTOM_CONTROLLER_CONF_KEY, + RpcControllerFactory.class.getName()); + return ReflectionUtils.instantiateWithCustomCtor(rpcControllerFactoryClazz, + new Class[] { Configuration.class }, new Object[] { configuration }); + } +} \ No newline at end of file diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java index 5445f4d9531..c31e4518b42 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java @@ -30,6 +30,7 @@ import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.client.AsyncProcess.AsyncRequestFuture; import org.apache.hadoop.hbase.client.coprocessor.Batch; import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback; +import org.apache.hadoop.hbase.ipc.RpcControllerFactory; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Threads; import org.junit.Assert; @@ -125,14 +126,14 @@ public class TestAsyncProcess { public MyAsyncProcess(ClusterConnection hc, Configuration conf, AtomicInteger nbThreads) { super(hc, conf, new ThreadPoolExecutor(1, 20, 60, TimeUnit.SECONDS, new SynchronousQueue(), new CountingThreadFactory(nbThreads)), - new RpcRetryingCallerFactory(conf), false); + new RpcRetryingCallerFactory(conf), false, new RpcControllerFactory(conf)); } public MyAsyncProcess( ClusterConnection hc, Configuration conf, boolean useGlobalErrors) { super(hc, conf, new ThreadPoolExecutor(1, 20, 60, TimeUnit.SECONDS, new SynchronousQueue(), new CountingThreadFactory(new AtomicInteger())), - new RpcRetryingCallerFactory(conf), useGlobalErrors); + new RpcRetryingCallerFactory(conf), useGlobalErrors, new RpcControllerFactory(conf)); } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEditsReplaySink.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEditsReplaySink.java index ffb79c19747..1c08facc8c3 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEditsReplaySink.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEditsReplaySink.java @@ -40,6 +40,7 @@ import org.apache.hadoop.hbase.client.HConnection; import org.apache.hadoop.hbase.client.RegionServerCallable; import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory; import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController; +import org.apache.hadoop.hbase.ipc.RpcControllerFactory; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.ReplicationProtbufUtil; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos; @@ -69,6 +70,7 @@ public class WALEditsReplaySink { private final AtomicLong totalReplayedEdits = new AtomicLong(); private final boolean skipErrors; private final int replayTimeout; + private RpcControllerFactory rpcControllerFactory; /** * Create a sink for WAL log entries replay @@ -87,6 +89,7 @@ public class WALEditsReplaySink { HConstants.DEFAULT_HREGION_EDITS_REPLAY_SKIP_ERRORS); // a single replay operation time out and default is 60 seconds this.replayTimeout = conf.getInt("hbase.regionserver.logreplay.timeout", 60000); + this.rpcControllerFactory = RpcControllerFactory.instantiate(conf); } /** @@ -211,7 +214,7 @@ public class WALEditsReplaySink { Pair p = ReplicationProtbufUtil.buildReplicateWALEntryRequest(entriesArray); - PayloadCarryingRpcController controller = new PayloadCarryingRpcController(p.getSecond()); + PayloadCarryingRpcController controller = rpcControllerFactory.newController(p.getSecond()); try { remoteSvr.replay(controller, p.getFirst()); } catch (ServiceException se) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java index 35471a932a3..5a86ab506bb 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java @@ -28,6 +28,7 @@ import org.apache.hadoop.hbase.ZooKeeperConnectionException; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos; import org.apache.hadoop.hbase.client.ConnectionManager.HConnectionImplementation; +import org.apache.hadoop.hbase.ipc.RpcControllerFactory; import org.mockito.Mockito; /** @@ -121,8 +122,9 @@ public class HConnectionTestingUtility { } NonceGenerator ng = Mockito.mock(NonceGenerator.class); Mockito.when(c.getNonceGenerator()).thenReturn(ng); - Mockito.when(c.getAsyncProcess()).thenReturn(new AsyncProcess( - c, conf, null, RpcRetryingCallerFactory.instantiate(conf), false)); + Mockito.when(c.getAsyncProcess()).thenReturn( + new AsyncProcess(c, conf, null, RpcRetryingCallerFactory.instantiate(conf), false, + RpcControllerFactory.instantiate(conf))); Mockito.doNothing().when(c).incCount(); Mockito.doNothing().when(c).decCount(); return c; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRpcControllerFactory.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRpcControllerFactory.java new file mode 100644 index 00000000000..02c2ef8f17f --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRpcControllerFactory.java @@ -0,0 +1,203 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import static org.apache.hadoop.hbase.HBaseTestingUtility.fam1; +import static org.junit.Assert.assertEquals; + +import java.io.IOException; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.CellScannable; +import org.apache.hadoop.hbase.CellScanner; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.MediumTests; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; +import org.apache.hadoop.hbase.coprocessor.ProtobufCoprocessorService; +import org.apache.hadoop.hbase.ipc.DelegatingPayloadCarryingRpcController; +import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController; +import org.apache.hadoop.hbase.ipc.RpcControllerFactory; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import com.google.common.collect.Lists; + +@Category(MediumTests.class) +public class TestRpcControllerFactory { + + public static class StaticRpcControllerFactory extends RpcControllerFactory { + + public StaticRpcControllerFactory(Configuration conf) { + super(conf); + } + + public PayloadCarryingRpcController newController() { + return new CountingRpcController(super.newController()); + } + + public PayloadCarryingRpcController newController(final CellScanner cellScanner) { + return new CountingRpcController(super.newController(cellScanner)); + } + + public PayloadCarryingRpcController newController(final List cellIterables) { + return new CountingRpcController(super.newController(cellIterables)); + } + } + + public static class CountingRpcController extends DelegatingPayloadCarryingRpcController { + + private static AtomicInteger INT_PRIORITY = new AtomicInteger(); + private static AtomicInteger TABLE_PRIORITY = new AtomicInteger(); + + public CountingRpcController(PayloadCarryingRpcController delegate) { + super(delegate); + } + + @Override + public void setPriority(int priority) { + super.setPriority(priority); + INT_PRIORITY.incrementAndGet(); + } + + @Override + public void setPriority(TableName tn) { + super.setPriority(tn); + // ignore counts for system tables - it could change and we really only want to check on what + // the client should change + if (!tn.isSystemTable()) { + TABLE_PRIORITY.incrementAndGet(); + } + + } + } + + private static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); + + @BeforeClass + public static void setup() throws Exception { + // load an endpoint so we have an endpoint to test - it doesn't matter which one, but + // this is already in tests, so we can just use it. + Configuration conf = UTIL.getConfiguration(); + conf.set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, + ProtobufCoprocessorService.class.getName()); + + UTIL.startMiniCluster(); + } + + @AfterClass + public static void teardown() throws Exception { + UTIL.shutdownMiniCluster(); + } + + /** + * check some of the methods and make sure we are incrementing each time. Its a bit tediuous to + * cover all methods here and really is a bit brittle since we can always add new methods but + * won't be sure to add them here. So we just can cover the major ones. + * @throws Exception on failure + */ + @Test + public void testCountController() throws Exception { + Configuration conf = new Configuration(UTIL.getConfiguration()); + // setup our custom controller + conf.set(RpcControllerFactory.CUSTOM_CONTROLLER_CONF_KEY, + StaticRpcControllerFactory.class.getName()); + + TableName name = TableName.valueOf("testcustomcontroller"); + UTIL.createTable(name, fam1).close(); + + // change one of the connection properties so we get a new HConnection with our configuration + conf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT + 1); + + HTable table = new HTable(conf, name); + table.setAutoFlushTo(false); + byte[] row = Bytes.toBytes("row"); + Put p = new Put(row); + p.add(fam1, fam1, Bytes.toBytes("val0")); + table.put(p); + table.flushCommits(); + Integer counter = 1; + counter = verifyCount(counter); + + Delete d = new Delete(row); + d.deleteColumn(fam1, fam1); + table.delete(d); + counter = verifyCount(counter); + + Put p2 = new Put(row); + p2.add(fam1, Bytes.toBytes("qual"), Bytes.toBytes("val1")); + table.batch(Lists.newArrayList(p, p2), new Object[2]); + // this only goes to a single server, so we don't need to change the count here + counter = verifyCount(counter); + + Append append = new Append(row); + append.add(fam1, fam1, Bytes.toBytes("val2")); + table.append(append); + counter = verifyCount(counter); + + // and check the major lookup calls as well + Get g = new Get(row); + table.get(g); + counter = verifyCount(counter); + + ResultScanner scan = table.getScanner(fam1); + scan.next(); + scan.close(); + counter = verifyCount(counter); + + Get g2 = new Get(row); + table.get(Lists.newArrayList(g, g2)); + // same server, so same as above for not changing count + counter = verifyCount(counter); + + // make sure all the scanner types are covered + Scan scanInfo = new Scan(row); + // regular small + scanInfo.setSmall(true); + counter = doScan(table, scanInfo, counter); + + // reversed, small + scanInfo.setReversed(true); + counter = doScan(table, scanInfo, counter); + + // reversed, regular + scanInfo.setSmall(false); + counter = doScan(table, scanInfo, counter); + + table.close(); + } + + int doScan(HTable table, Scan scan, int expectedCount) throws IOException { + ResultScanner results = table.getScanner(scan); + results.next(); + results.close(); + return verifyCount(expectedCount); + } + + int verifyCount(Integer counter) { + assertEquals(counter.intValue(), CountingRpcController.TABLE_PRIORITY.get()); + assertEquals(0, CountingRpcController.INT_PRIORITY.get()); + return counter + 1; + } +} \ No newline at end of file