HBASE-11048 Support setting custom priority per client RPC
This commit is contained in:
parent
0b883059ea
commit
c61cb7fb55
|
@ -46,6 +46,7 @@ import org.apache.hadoop.hbase.HRegionLocation;
|
||||||
import org.apache.hadoop.hbase.ServerName;
|
import org.apache.hadoop.hbase.ServerName;
|
||||||
import org.apache.hadoop.hbase.TableName;
|
import org.apache.hadoop.hbase.TableName;
|
||||||
import org.apache.hadoop.hbase.client.coprocessor.Batch;
|
import org.apache.hadoop.hbase.client.coprocessor.Batch;
|
||||||
|
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||||
import org.htrace.Trace;
|
import org.htrace.Trace;
|
||||||
|
@ -120,6 +121,7 @@ class AsyncProcess {
|
||||||
|
|
||||||
protected final ClusterConnection hConnection;
|
protected final ClusterConnection hConnection;
|
||||||
protected final RpcRetryingCallerFactory rpcCallerFactory;
|
protected final RpcRetryingCallerFactory rpcCallerFactory;
|
||||||
|
protected final RpcControllerFactory rpcFactory;
|
||||||
protected final BatchErrors globalErrors;
|
protected final BatchErrors globalErrors;
|
||||||
protected final ExecutorService pool;
|
protected final ExecutorService pool;
|
||||||
|
|
||||||
|
@ -188,7 +190,7 @@ class AsyncProcess {
|
||||||
}
|
}
|
||||||
|
|
||||||
public AsyncProcess(ClusterConnection hc, Configuration conf, ExecutorService pool,
|
public AsyncProcess(ClusterConnection hc, Configuration conf, ExecutorService pool,
|
||||||
RpcRetryingCallerFactory rpcCaller, boolean useGlobalErrors) {
|
RpcRetryingCallerFactory rpcCaller, boolean useGlobalErrors, RpcControllerFactory rpcFactory) {
|
||||||
if (hc == null) {
|
if (hc == null) {
|
||||||
throw new IllegalArgumentException("HConnection cannot be null.");
|
throw new IllegalArgumentException("HConnection cannot be null.");
|
||||||
}
|
}
|
||||||
|
@ -242,8 +244,8 @@ class AsyncProcess {
|
||||||
serverTrackerTimeout += ConnectionUtils.getPauseTime(this.pause, i);
|
serverTrackerTimeout += ConnectionUtils.getPauseTime(this.pause, i);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
this.rpcCallerFactory = rpcCaller;
|
this.rpcCallerFactory = rpcCaller;
|
||||||
|
this.rpcFactory = rpcFactory;
|
||||||
}
|
}
|
||||||
|
|
||||||
private ExecutorService getPool(ExecutorService pool) {
|
private ExecutorService getPool(ExecutorService pool) {
|
||||||
|
@ -950,7 +952,7 @@ class AsyncProcess {
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
protected MultiServerCallable<Row> createCallable(final ServerName server,
|
protected MultiServerCallable<Row> createCallable(final ServerName server,
|
||||||
TableName tableName, final MultiAction<Row> multi) {
|
TableName tableName, final MultiAction<Row> multi) {
|
||||||
return new MultiServerCallable<Row>(hConnection, tableName, server, multi);
|
return new MultiServerCallable<Row>(hConnection, tableName, server, this.rpcFactory, multi);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -35,6 +35,7 @@ import org.apache.hadoop.hbase.NotServingRegionException;
|
||||||
import org.apache.hadoop.hbase.TableName;
|
import org.apache.hadoop.hbase.TableName;
|
||||||
import org.apache.hadoop.hbase.UnknownScannerException;
|
import org.apache.hadoop.hbase.UnknownScannerException;
|
||||||
import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException;
|
import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException;
|
||||||
|
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
|
||||||
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
|
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
|
||||||
import org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos;
|
import org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos;
|
||||||
import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
|
import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
|
||||||
|
@ -66,6 +67,7 @@ public class ClientScanner extends AbstractClientScanner {
|
||||||
protected final int scannerTimeout;
|
protected final int scannerTimeout;
|
||||||
protected boolean scanMetricsPublished = false;
|
protected boolean scanMetricsPublished = false;
|
||||||
protected RpcRetryingCaller<Result []> caller;
|
protected RpcRetryingCaller<Result []> caller;
|
||||||
|
protected RpcControllerFactory rpcControllerFactory;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Create a new ClientScanner for the specified table. An HConnection will be
|
* Create a new ClientScanner for the specified table. An HConnection will be
|
||||||
|
@ -93,27 +95,36 @@ public class ClientScanner extends AbstractClientScanner {
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Create a new ClientScanner for the specified table
|
* @deprecated use
|
||||||
* Note that the passed {@link Scan}'s start row maybe changed changed.
|
* {@link #ClientScanner(Configuration, Scan, TableName, HConnection, RpcRetryingCallerFactory, RpcControllerFactory)}
|
||||||
*
|
|
||||||
* @param conf The {@link Configuration} to use.
|
|
||||||
* @param scan {@link Scan} to use in this scanner
|
|
||||||
* @param tableName The table that we wish to scan
|
|
||||||
* @param connection Connection identifying the cluster
|
|
||||||
* @throws IOException
|
|
||||||
*/
|
*/
|
||||||
|
@Deprecated
|
||||||
public ClientScanner(final Configuration conf, final Scan scan, final TableName tableName,
|
public ClientScanner(final Configuration conf, final Scan scan, final TableName tableName,
|
||||||
HConnection connection) throws IOException {
|
HConnection connection) throws IOException {
|
||||||
this(conf, scan, tableName, connection, new RpcRetryingCallerFactory(conf));
|
this(conf, scan, tableName, connection, new RpcRetryingCallerFactory(conf),
|
||||||
|
RpcControllerFactory.instantiate(conf));
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @deprecated Use {@link #ClientScanner(Configuration, Scan, TableName, HConnection)}
|
* @deprecated Use
|
||||||
|
* {@link #ClientScanner(Configuration, Scan, TableName, HConnection, RpcRetryingCallerFactory, RpcControllerFactory)}
|
||||||
*/
|
*/
|
||||||
@Deprecated
|
@Deprecated
|
||||||
public ClientScanner(final Configuration conf, final Scan scan, final byte [] tableName,
|
public ClientScanner(final Configuration conf, final Scan scan, final byte [] tableName,
|
||||||
HConnection connection) throws IOException {
|
HConnection connection) throws IOException {
|
||||||
this(conf, scan, TableName.valueOf(tableName), connection, new RpcRetryingCallerFactory(conf));
|
this(conf, scan, TableName.valueOf(tableName), connection, new RpcRetryingCallerFactory(conf),
|
||||||
|
RpcControllerFactory.instantiate(conf));
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @deprecated Use
|
||||||
|
* {@link #ClientScanner(Configuration, Scan, TableName, HConnection, RpcRetryingCallerFactory, RpcControllerFactory)
|
||||||
|
* instead.
|
||||||
|
*/
|
||||||
|
@Deprecated
|
||||||
|
public ClientScanner(final Configuration conf, final Scan scan, final TableName tableName,
|
||||||
|
HConnection connection, RpcRetryingCallerFactory rpcFactory) throws IOException {
|
||||||
|
this(conf, scan, tableName, connection, rpcFactory, RpcControllerFactory.instantiate(conf));
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -126,7 +137,8 @@ public class ClientScanner extends AbstractClientScanner {
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
public ClientScanner(final Configuration conf, final Scan scan, final TableName tableName,
|
public ClientScanner(final Configuration conf, final Scan scan, final TableName tableName,
|
||||||
HConnection connection, RpcRetryingCallerFactory rpcFactory) throws IOException {
|
HConnection connection, RpcRetryingCallerFactory rpcFactory,
|
||||||
|
RpcControllerFactory controllerFactory) throws IOException {
|
||||||
if (LOG.isTraceEnabled()) {
|
if (LOG.isTraceEnabled()) {
|
||||||
LOG.trace("Scan table=" + tableName
|
LOG.trace("Scan table=" + tableName
|
||||||
+ ", startRow=" + Bytes.toStringBinary(scan.getStartRow()));
|
+ ", startRow=" + Bytes.toStringBinary(scan.getStartRow()));
|
||||||
|
@ -160,6 +172,7 @@ public class ClientScanner extends AbstractClientScanner {
|
||||||
}
|
}
|
||||||
|
|
||||||
this.caller = rpcFactory.<Result[]> newCaller();
|
this.caller = rpcFactory.<Result[]> newCaller();
|
||||||
|
this.rpcControllerFactory = controllerFactory;
|
||||||
|
|
||||||
initializeScannerInConstruction();
|
initializeScannerInConstruction();
|
||||||
}
|
}
|
||||||
|
@ -277,8 +290,9 @@ public class ClientScanner extends AbstractClientScanner {
|
||||||
protected ScannerCallable getScannerCallable(byte [] localStartKey,
|
protected ScannerCallable getScannerCallable(byte [] localStartKey,
|
||||||
int nbRows) {
|
int nbRows) {
|
||||||
scan.setStartRow(localStartKey);
|
scan.setStartRow(localStartKey);
|
||||||
ScannerCallable s = new ScannerCallable(getConnection(),
|
ScannerCallable s =
|
||||||
getTable(), scan, this.scanMetrics);
|
new ScannerCallable(getConnection(), getTable(), scan, this.scanMetrics,
|
||||||
|
this.rpcControllerFactory);
|
||||||
s.setCaching(nbRows);
|
s.setCaching(nbRows);
|
||||||
return s;
|
return s;
|
||||||
}
|
}
|
||||||
|
|
|
@ -109,7 +109,7 @@ public class ClientSmallReversedScanner extends ReversedClientScanner {
|
||||||
}
|
}
|
||||||
|
|
||||||
smallScanCallable = ClientSmallScanner.getSmallScanCallable(
|
smallScanCallable = ClientSmallScanner.getSmallScanCallable(
|
||||||
scan, getConnection(), getTable(), localStartKey, cacheNum);
|
scan, getConnection(), getTable(), localStartKey, cacheNum, this.rpcControllerFactory);
|
||||||
|
|
||||||
if (this.scanMetrics != null && skipRowOfFirstResult == null) {
|
if (this.scanMetrics != null && skipRowOfFirstResult == null) {
|
||||||
this.scanMetrics.countOfRegions.incrementAndGet();
|
this.scanMetrics.countOfRegions.incrementAndGet();
|
||||||
|
|
|
@ -33,6 +33,7 @@ import org.apache.hadoop.hbase.HRegionInfo;
|
||||||
import org.apache.hadoop.hbase.KeyValueUtil;
|
import org.apache.hadoop.hbase.KeyValueUtil;
|
||||||
import org.apache.hadoop.hbase.TableName;
|
import org.apache.hadoop.hbase.TableName;
|
||||||
import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
|
import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
|
||||||
|
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
|
||||||
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
|
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
|
||||||
import org.apache.hadoop.hbase.protobuf.RequestConverter;
|
import org.apache.hadoop.hbase.protobuf.RequestConverter;
|
||||||
import org.apache.hadoop.hbase.protobuf.ResponseConverter;
|
import org.apache.hadoop.hbase.protobuf.ResponseConverter;
|
||||||
|
@ -85,7 +86,18 @@ public class ClientSmallScanner extends ClientScanner {
|
||||||
*/
|
*/
|
||||||
public ClientSmallScanner(final Configuration conf, final Scan scan,
|
public ClientSmallScanner(final Configuration conf, final Scan scan,
|
||||||
final TableName tableName, HConnection connection) throws IOException {
|
final TableName tableName, HConnection connection) throws IOException {
|
||||||
this(conf, scan, tableName, connection, new RpcRetryingCallerFactory(conf));
|
this(conf, scan, tableName, connection, new RpcRetryingCallerFactory(conf),
|
||||||
|
new RpcControllerFactory(conf));
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @deprecated use
|
||||||
|
* {@link #ClientSmallScanner(Configuration, Scan, TableName, HConnection, RpcRetryingCallerFactory, RpcControllerFactory)
|
||||||
|
* instead
|
||||||
|
*/
|
||||||
|
public ClientSmallScanner(final Configuration conf, final Scan scan, final TableName tableName,
|
||||||
|
HConnection connection, RpcRetryingCallerFactory rpcFactory) throws IOException {
|
||||||
|
this(conf, scan, tableName, connection, rpcFactory, RpcControllerFactory.instantiate(conf));
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -99,10 +111,10 @@ public class ClientSmallScanner extends ClientScanner {
|
||||||
* @param rpcFactory
|
* @param rpcFactory
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
public ClientSmallScanner(final Configuration conf, final Scan scan,
|
public ClientSmallScanner(final Configuration conf, final Scan scan, final TableName tableName,
|
||||||
final TableName tableName, HConnection connection,
|
HConnection connection, RpcRetryingCallerFactory rpcFactory,
|
||||||
RpcRetryingCallerFactory rpcFactory) throws IOException {
|
RpcControllerFactory controllerFactory) throws IOException {
|
||||||
super(conf, scan, tableName, connection, rpcFactory);
|
super(conf, scan, tableName, connection, rpcFactory, controllerFactory);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -154,7 +166,7 @@ public class ClientSmallScanner extends ClientScanner {
|
||||||
+ Bytes.toStringBinary(localStartKey) + "'");
|
+ Bytes.toStringBinary(localStartKey) + "'");
|
||||||
}
|
}
|
||||||
smallScanCallable = getSmallScanCallable(
|
smallScanCallable = getSmallScanCallable(
|
||||||
scan, getConnection(), getTable(), localStartKey, cacheNum);
|
scan, getConnection(), getTable(), localStartKey, cacheNum, rpcControllerFactory);
|
||||||
if (this.scanMetrics != null && skipRowOfFirstResult == null) {
|
if (this.scanMetrics != null && skipRowOfFirstResult == null) {
|
||||||
this.scanMetrics.countOfRegions.incrementAndGet();
|
this.scanMetrics.countOfRegions.incrementAndGet();
|
||||||
}
|
}
|
||||||
|
@ -163,14 +175,14 @@ public class ClientSmallScanner extends ClientScanner {
|
||||||
|
|
||||||
static RegionServerCallable<Result[]> getSmallScanCallable(
|
static RegionServerCallable<Result[]> getSmallScanCallable(
|
||||||
final Scan sc, HConnection connection, TableName table,
|
final Scan sc, HConnection connection, TableName table,
|
||||||
byte[] localStartKey, final int cacheNum) {
|
byte[] localStartKey, final int cacheNum, final RpcControllerFactory rpcControllerFactory) {
|
||||||
sc.setStartRow(localStartKey);
|
sc.setStartRow(localStartKey);
|
||||||
RegionServerCallable<Result[]> callable = new RegionServerCallable<Result[]>(
|
RegionServerCallable<Result[]> callable = new RegionServerCallable<Result[]>(
|
||||||
connection, table, sc.getStartRow()) {
|
connection, table, sc.getStartRow()) {
|
||||||
public Result[] call(int callTimeout) throws IOException {
|
public Result[] call(int callTimeout) throws IOException {
|
||||||
ScanRequest request = RequestConverter.buildScanRequest(getLocation()
|
ScanRequest request = RequestConverter.buildScanRequest(getLocation()
|
||||||
.getRegionInfo().getRegionName(), sc, cacheNum, true);
|
.getRegionInfo().getRegionName(), sc, cacheNum, true);
|
||||||
PayloadCarryingRpcController controller = new PayloadCarryingRpcController();
|
PayloadCarryingRpcController controller = rpcControllerFactory.newController();
|
||||||
controller.setPriority(getTableName());
|
controller.setPriority(getTableName());
|
||||||
controller.setCallTimeout(callTimeout);
|
controller.setCallTimeout(callTimeout);
|
||||||
try {
|
try {
|
||||||
|
|
|
@ -68,6 +68,7 @@ import org.apache.hadoop.hbase.client.coprocessor.Batch;
|
||||||
import org.apache.hadoop.hbase.exceptions.RegionMovedException;
|
import org.apache.hadoop.hbase.exceptions.RegionMovedException;
|
||||||
import org.apache.hadoop.hbase.exceptions.RegionOpeningException;
|
import org.apache.hadoop.hbase.exceptions.RegionOpeningException;
|
||||||
import org.apache.hadoop.hbase.ipc.RpcClient;
|
import org.apache.hadoop.hbase.ipc.RpcClient;
|
||||||
|
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
|
||||||
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
|
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
|
||||||
import org.apache.hadoop.hbase.protobuf.RequestConverter;
|
import org.apache.hadoop.hbase.protobuf.RequestConverter;
|
||||||
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService;
|
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService;
|
||||||
|
@ -2200,8 +2201,8 @@ class ConnectionManager {
|
||||||
// For tests to override.
|
// For tests to override.
|
||||||
protected AsyncProcess createAsyncProcess(Configuration conf) {
|
protected AsyncProcess createAsyncProcess(Configuration conf) {
|
||||||
// No default pool available.
|
// No default pool available.
|
||||||
return new AsyncProcess(
|
return new AsyncProcess(this, conf, this.batchPool,
|
||||||
this, conf, this.batchPool, RpcRetryingCallerFactory.instantiate(conf), false);
|
RpcRetryingCallerFactory.instantiate(conf), false, RpcControllerFactory.instantiate(conf));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -61,6 +61,7 @@ import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
|
||||||
import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
|
import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
|
||||||
import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
|
import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
|
||||||
import org.apache.hadoop.hbase.ipc.RegionCoprocessorRpcChannel;
|
import org.apache.hadoop.hbase.ipc.RegionCoprocessorRpcChannel;
|
||||||
|
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
|
||||||
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
|
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
|
||||||
import org.apache.hadoop.hbase.protobuf.RequestConverter;
|
import org.apache.hadoop.hbase.protobuf.RequestConverter;
|
||||||
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
|
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
|
||||||
|
@ -146,6 +147,7 @@ public class HTable implements HTableInterface {
|
||||||
/** The Async process for batch */
|
/** The Async process for batch */
|
||||||
protected AsyncProcess multiAp;
|
protected AsyncProcess multiAp;
|
||||||
private RpcRetryingCallerFactory rpcCallerFactory;
|
private RpcRetryingCallerFactory rpcCallerFactory;
|
||||||
|
private RpcControllerFactory rpcControllerFactory;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Creates an object to access a HBase table.
|
* Creates an object to access a HBase table.
|
||||||
|
@ -362,8 +364,9 @@ public class HTable implements HTableInterface {
|
||||||
HConstants.DEFAULT_HBASE_CLIENT_SCANNER_CACHING);
|
HConstants.DEFAULT_HBASE_CLIENT_SCANNER_CACHING);
|
||||||
|
|
||||||
this.rpcCallerFactory = RpcRetryingCallerFactory.instantiate(configuration);
|
this.rpcCallerFactory = RpcRetryingCallerFactory.instantiate(configuration);
|
||||||
|
this.rpcControllerFactory = RpcControllerFactory.instantiate(configuration);
|
||||||
// puts need to track errors globally due to how the APIs currently work.
|
// puts need to track errors globally due to how the APIs currently work.
|
||||||
ap = new AsyncProcess(connection, configuration, pool, rpcCallerFactory, true);
|
ap = new AsyncProcess(connection, configuration, pool, rpcCallerFactory, true, rpcControllerFactory);
|
||||||
multiAp = this.connection.getAsyncProcess();
|
multiAp = this.connection.getAsyncProcess();
|
||||||
|
|
||||||
this.maxKeyValueSize = this.configuration.getInt(
|
this.maxKeyValueSize = this.configuration.getInt(
|
||||||
|
@ -725,7 +728,7 @@ public class HTable implements HTableInterface {
|
||||||
RegionServerCallable<Result> callable = new RegionServerCallable<Result>(this.connection,
|
RegionServerCallable<Result> callable = new RegionServerCallable<Result>(this.connection,
|
||||||
tableName, row) {
|
tableName, row) {
|
||||||
public Result call(int callTimeout) throws IOException {
|
public Result call(int callTimeout) throws IOException {
|
||||||
PayloadCarryingRpcController controller = new PayloadCarryingRpcController();
|
PayloadCarryingRpcController controller = rpcControllerFactory.newController();
|
||||||
controller.setPriority(tableName);
|
controller.setPriority(tableName);
|
||||||
controller.setCallTimeout(callTimeout);
|
controller.setCallTimeout(callTimeout);
|
||||||
ClientProtos.GetRequest request = RequestConverter.buildGetRowOrBeforeRequest(
|
ClientProtos.GetRequest request = RequestConverter.buildGetRowOrBeforeRequest(
|
||||||
|
@ -763,10 +766,10 @@ public class HTable implements HTableInterface {
|
||||||
|
|
||||||
if (scan.isSmall()) {
|
if (scan.isSmall()) {
|
||||||
return new ClientSmallScanner(getConfiguration(), scan, getName(),
|
return new ClientSmallScanner(getConfiguration(), scan, getName(),
|
||||||
this.connection, this.rpcCallerFactory);
|
this.connection, this.rpcCallerFactory, this.rpcControllerFactory);
|
||||||
} else {
|
} else {
|
||||||
return new ClientScanner(getConfiguration(), scan,
|
return new ClientScanner(getConfiguration(), scan, getName(), this.connection,
|
||||||
getName(), this.connection);
|
this.rpcCallerFactory, this.rpcControllerFactory);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -801,7 +804,7 @@ public class HTable implements HTableInterface {
|
||||||
public Result call(int callTimeout) throws IOException {
|
public Result call(int callTimeout) throws IOException {
|
||||||
ClientProtos.GetRequest request =
|
ClientProtos.GetRequest request =
|
||||||
RequestConverter.buildGetRequest(getLocation().getRegionInfo().getRegionName(), get);
|
RequestConverter.buildGetRequest(getLocation().getRegionInfo().getRegionName(), get);
|
||||||
PayloadCarryingRpcController controller = new PayloadCarryingRpcController();
|
PayloadCarryingRpcController controller = rpcControllerFactory.newController();
|
||||||
controller.setPriority(tableName);
|
controller.setPriority(tableName);
|
||||||
controller.setCallTimeout(callTimeout);
|
controller.setCallTimeout(callTimeout);
|
||||||
try {
|
try {
|
||||||
|
@ -902,7 +905,7 @@ public class HTable implements HTableInterface {
|
||||||
RegionServerCallable<Boolean> callable = new RegionServerCallable<Boolean>(connection,
|
RegionServerCallable<Boolean> callable = new RegionServerCallable<Boolean>(connection,
|
||||||
tableName, delete.getRow()) {
|
tableName, delete.getRow()) {
|
||||||
public Boolean call(int callTimeout) throws IOException {
|
public Boolean call(int callTimeout) throws IOException {
|
||||||
PayloadCarryingRpcController controller = new PayloadCarryingRpcController();
|
PayloadCarryingRpcController controller = rpcControllerFactory.newController();
|
||||||
controller.setPriority(tableName);
|
controller.setPriority(tableName);
|
||||||
controller.setCallTimeout(callTimeout);
|
controller.setCallTimeout(callTimeout);
|
||||||
|
|
||||||
|
@ -1042,7 +1045,7 @@ public class HTable implements HTableInterface {
|
||||||
RegionServerCallable<Void> callable =
|
RegionServerCallable<Void> callable =
|
||||||
new RegionServerCallable<Void>(connection, getName(), rm.getRow()) {
|
new RegionServerCallable<Void>(connection, getName(), rm.getRow()) {
|
||||||
public Void call(int callTimeout) throws IOException {
|
public Void call(int callTimeout) throws IOException {
|
||||||
PayloadCarryingRpcController controller = new PayloadCarryingRpcController();
|
PayloadCarryingRpcController controller = rpcControllerFactory.newController();
|
||||||
controller.setPriority(tableName);
|
controller.setPriority(tableName);
|
||||||
controller.setCallTimeout(callTimeout);
|
controller.setCallTimeout(callTimeout);
|
||||||
try {
|
try {
|
||||||
|
@ -1076,7 +1079,7 @@ public class HTable implements HTableInterface {
|
||||||
RegionServerCallable<Result> callable =
|
RegionServerCallable<Result> callable =
|
||||||
new RegionServerCallable<Result>(this.connection, getName(), append.getRow()) {
|
new RegionServerCallable<Result>(this.connection, getName(), append.getRow()) {
|
||||||
public Result call(int callTimeout) throws IOException {
|
public Result call(int callTimeout) throws IOException {
|
||||||
PayloadCarryingRpcController controller = new PayloadCarryingRpcController();
|
PayloadCarryingRpcController controller = rpcControllerFactory.newController();
|
||||||
controller.setPriority(getTableName());
|
controller.setPriority(getTableName());
|
||||||
controller.setCallTimeout(callTimeout);
|
controller.setCallTimeout(callTimeout);
|
||||||
try {
|
try {
|
||||||
|
@ -1107,7 +1110,7 @@ public class HTable implements HTableInterface {
|
||||||
RegionServerCallable<Result> callable = new RegionServerCallable<Result>(this.connection,
|
RegionServerCallable<Result> callable = new RegionServerCallable<Result>(this.connection,
|
||||||
getName(), increment.getRow()) {
|
getName(), increment.getRow()) {
|
||||||
public Result call(int callTimeout) throws IOException {
|
public Result call(int callTimeout) throws IOException {
|
||||||
PayloadCarryingRpcController controller = new PayloadCarryingRpcController();
|
PayloadCarryingRpcController controller = rpcControllerFactory.newController();
|
||||||
controller.setPriority(getTableName());
|
controller.setPriority(getTableName());
|
||||||
controller.setCallTimeout(callTimeout);
|
controller.setCallTimeout(callTimeout);
|
||||||
try {
|
try {
|
||||||
|
@ -1170,7 +1173,7 @@ public class HTable implements HTableInterface {
|
||||||
RegionServerCallable<Long> callable =
|
RegionServerCallable<Long> callable =
|
||||||
new RegionServerCallable<Long>(connection, getName(), row) {
|
new RegionServerCallable<Long>(connection, getName(), row) {
|
||||||
public Long call(int callTimeout) throws IOException {
|
public Long call(int callTimeout) throws IOException {
|
||||||
PayloadCarryingRpcController controller = new PayloadCarryingRpcController();
|
PayloadCarryingRpcController controller = rpcControllerFactory.newController();
|
||||||
controller.setPriority(getTableName());
|
controller.setPriority(getTableName());
|
||||||
controller.setCallTimeout(callTimeout);
|
controller.setCallTimeout(callTimeout);
|
||||||
try {
|
try {
|
||||||
|
@ -1200,7 +1203,7 @@ public class HTable implements HTableInterface {
|
||||||
RegionServerCallable<Boolean> callable =
|
RegionServerCallable<Boolean> callable =
|
||||||
new RegionServerCallable<Boolean>(connection, getName(), row) {
|
new RegionServerCallable<Boolean>(connection, getName(), row) {
|
||||||
public Boolean call(int callTimeout) throws IOException {
|
public Boolean call(int callTimeout) throws IOException {
|
||||||
PayloadCarryingRpcController controller = new PayloadCarryingRpcController();
|
PayloadCarryingRpcController controller = rpcControllerFactory.newController();
|
||||||
controller.setPriority(tableName);
|
controller.setPriority(tableName);
|
||||||
controller.setCallTimeout(callTimeout);
|
controller.setCallTimeout(callTimeout);
|
||||||
try {
|
try {
|
||||||
|
@ -1257,7 +1260,7 @@ public class HTable implements HTableInterface {
|
||||||
RegionServerCallable<Boolean> callable =
|
RegionServerCallable<Boolean> callable =
|
||||||
new RegionServerCallable<Boolean>(connection, getName(), row) {
|
new RegionServerCallable<Boolean>(connection, getName(), row) {
|
||||||
public Boolean call(int callTimeout) throws IOException {
|
public Boolean call(int callTimeout) throws IOException {
|
||||||
PayloadCarryingRpcController controller = new PayloadCarryingRpcController();
|
PayloadCarryingRpcController controller = rpcControllerFactory.newController();
|
||||||
controller.setPriority(tableName);
|
controller.setPriority(tableName);
|
||||||
controller.setCallTimeout(callTimeout);
|
controller.setCallTimeout(callTimeout);
|
||||||
try {
|
try {
|
||||||
|
@ -1285,7 +1288,7 @@ public class HTable implements HTableInterface {
|
||||||
RegionServerCallable<Boolean> callable =
|
RegionServerCallable<Boolean> callable =
|
||||||
new RegionServerCallable<Boolean>(connection, getName(), row) {
|
new RegionServerCallable<Boolean>(connection, getName(), row) {
|
||||||
public Boolean call(int callTimeout) throws IOException {
|
public Boolean call(int callTimeout) throws IOException {
|
||||||
PayloadCarryingRpcController controller = new PayloadCarryingRpcController();
|
PayloadCarryingRpcController controller = rpcControllerFactory.newController();
|
||||||
controller.setPriority(tableName);
|
controller.setPriority(tableName);
|
||||||
controller.setCallTimeout(callTimeout);
|
controller.setCallTimeout(callTimeout);
|
||||||
try {
|
try {
|
||||||
|
@ -1761,8 +1764,10 @@ public class HTable implements HTableInterface {
|
||||||
final List<String> callbackErrorServers = new ArrayList<String>();
|
final List<String> callbackErrorServers = new ArrayList<String>();
|
||||||
Object[] results = new Object[execs.size()];
|
Object[] results = new Object[execs.size()];
|
||||||
|
|
||||||
AsyncProcess asyncProcess = new AsyncProcess(connection, configuration, pool,
|
AsyncProcess asyncProcess =
|
||||||
RpcRetryingCallerFactory.instantiate(configuration), true);
|
new AsyncProcess(connection, configuration, pool,
|
||||||
|
RpcRetryingCallerFactory.instantiate(configuration), true,
|
||||||
|
RpcControllerFactory.instantiate(configuration));
|
||||||
AsyncRequestFuture future = asyncProcess.submitAll(tableName, execs,
|
AsyncRequestFuture future = asyncProcess.submitAll(tableName, execs,
|
||||||
new Callback<ClientProtos.CoprocessorServiceResult>() {
|
new Callback<ClientProtos.CoprocessorServiceResult>() {
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -31,14 +31,15 @@ import org.apache.hadoop.hbase.HRegionLocation;
|
||||||
import org.apache.hadoop.hbase.ServerName;
|
import org.apache.hadoop.hbase.ServerName;
|
||||||
import org.apache.hadoop.hbase.TableName;
|
import org.apache.hadoop.hbase.TableName;
|
||||||
import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
|
import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
|
||||||
|
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
|
||||||
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
|
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
|
||||||
import org.apache.hadoop.hbase.protobuf.RequestConverter;
|
import org.apache.hadoop.hbase.protobuf.RequestConverter;
|
||||||
import org.apache.hadoop.hbase.protobuf.ResponseConverter;
|
import org.apache.hadoop.hbase.protobuf.ResponseConverter;
|
||||||
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
|
|
||||||
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
|
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
|
||||||
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest;
|
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest;
|
||||||
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto;
|
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto;
|
||||||
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionAction;
|
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionAction;
|
||||||
|
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
|
||||||
|
|
||||||
import com.google.protobuf.ServiceException;
|
import com.google.protobuf.ServiceException;
|
||||||
|
|
||||||
|
@ -51,10 +52,12 @@ import com.google.protobuf.ServiceException;
|
||||||
class MultiServerCallable<R> extends RegionServerCallable<MultiResponse> {
|
class MultiServerCallable<R> extends RegionServerCallable<MultiResponse> {
|
||||||
private final MultiAction<R> multiAction;
|
private final MultiAction<R> multiAction;
|
||||||
private final boolean cellBlock;
|
private final boolean cellBlock;
|
||||||
|
private RpcControllerFactory rpcFactory;
|
||||||
|
|
||||||
MultiServerCallable(final HConnection connection, final TableName tableName,
|
MultiServerCallable(final HConnection connection, final TableName tableName,
|
||||||
final ServerName location, final MultiAction<R> multi) {
|
final ServerName location, RpcControllerFactory rpcFactory, final MultiAction<R> multi) {
|
||||||
super(connection, tableName, null);
|
super(connection, tableName, null);
|
||||||
|
this.rpcFactory = rpcFactory;
|
||||||
this.multiAction = multi;
|
this.multiAction = multi;
|
||||||
// RegionServerCallable has HRegionLocation field, but this is a multi-region request.
|
// RegionServerCallable has HRegionLocation field, but this is a multi-region request.
|
||||||
// Using region info from parent HRegionLocation would be a mistake for this class; so
|
// Using region info from parent HRegionLocation would be a mistake for this class; so
|
||||||
|
@ -115,7 +118,7 @@ class MultiServerCallable<R> extends RegionServerCallable<MultiResponse> {
|
||||||
|
|
||||||
// Controller optionally carries cell data over the proxy/service boundary and also
|
// Controller optionally carries cell data over the proxy/service boundary and also
|
||||||
// optionally ferries cell response data back out again.
|
// optionally ferries cell response data back out again.
|
||||||
PayloadCarryingRpcController controller = new PayloadCarryingRpcController(cells);
|
PayloadCarryingRpcController controller = rpcFactory.newController(cells);
|
||||||
controller.setPriority(getTableName());
|
controller.setPriority(getTableName());
|
||||||
controller.setCallTimeout(callTimeout);
|
controller.setCallTimeout(callTimeout);
|
||||||
ClientProtos.MultiResponse responseProto;
|
ClientProtos.MultiResponse responseProto;
|
||||||
|
|
|
@ -125,8 +125,9 @@ public class ReversedClientScanner extends ClientScanner {
|
||||||
protected ScannerCallable getScannerCallable(byte[] localStartKey,
|
protected ScannerCallable getScannerCallable(byte[] localStartKey,
|
||||||
int nbRows, byte[] locateStartRow) {
|
int nbRows, byte[] locateStartRow) {
|
||||||
scan.setStartRow(localStartKey);
|
scan.setStartRow(localStartKey);
|
||||||
ScannerCallable s = new ReversedScannerCallable(getConnection(),
|
ScannerCallable s =
|
||||||
getTable(), scan, this.scanMetrics, locateStartRow);
|
new ReversedScannerCallable(getConnection(), getTable(), scan, this.scanMetrics,
|
||||||
|
locateStartRow, this.rpcControllerFactory);
|
||||||
s.setCaching(nbRows);
|
s.setCaching(nbRows);
|
||||||
return s;
|
return s;
|
||||||
}
|
}
|
||||||
|
|
|
@ -29,8 +29,11 @@ import org.apache.hadoop.hbase.HConstants;
|
||||||
import org.apache.hadoop.hbase.HRegionLocation;
|
import org.apache.hadoop.hbase.HRegionLocation;
|
||||||
import org.apache.hadoop.hbase.TableName;
|
import org.apache.hadoop.hbase.TableName;
|
||||||
import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
|
import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
|
||||||
|
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
|
||||||
|
import com.google.protobuf.RpcController;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A reversed ScannerCallable which supports backward scanning.
|
* A reversed ScannerCallable which supports backward scanning.
|
||||||
*/
|
*/
|
||||||
|
@ -45,17 +48,28 @@ public class ReversedScannerCallable extends ScannerCallable {
|
||||||
protected final byte[] locateStartRow;
|
protected final byte[] locateStartRow;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
*
|
|
||||||
* @param connection
|
* @param connection
|
||||||
* @param tableName
|
* @param tableName
|
||||||
* @param scan
|
* @param scan
|
||||||
* @param scanMetrics
|
* @param scanMetrics
|
||||||
* @param locateStartRow The start row for locating regions
|
* @param locateStartRow The start row for locating regions
|
||||||
|
* @param rpcFactory to create an {@link RpcController} to talk to the regionserver
|
||||||
*/
|
*/
|
||||||
|
public ReversedScannerCallable(HConnection connection, TableName tableName, Scan scan,
|
||||||
|
ScanMetrics scanMetrics, byte[] locateStartRow, RpcControllerFactory rpcFactory) {
|
||||||
|
super(connection, tableName, scan, scanMetrics, rpcFactory);
|
||||||
|
this.locateStartRow = locateStartRow;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @deprecated use
|
||||||
|
* {@link #ReversedScannerCallable(HConnection, TableName, Scan, ScanMetrics, byte[], RpcControllerFactory)}
|
||||||
|
*/
|
||||||
|
@Deprecated
|
||||||
public ReversedScannerCallable(HConnection connection, TableName tableName,
|
public ReversedScannerCallable(HConnection connection, TableName tableName,
|
||||||
Scan scan, ScanMetrics scanMetrics, byte[] locateStartRow) {
|
Scan scan, ScanMetrics scanMetrics, byte[] locateStartRow) {
|
||||||
super(connection, tableName, scan, scanMetrics);
|
this(connection, tableName, scan, scanMetrics, locateStartRow, RpcControllerFactory
|
||||||
this.locateStartRow = locateStartRow;
|
.instantiate(connection.getConfiguration()));
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -37,6 +37,7 @@ import org.apache.hadoop.hbase.TableName;
|
||||||
import org.apache.hadoop.hbase.UnknownScannerException;
|
import org.apache.hadoop.hbase.UnknownScannerException;
|
||||||
import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
|
import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
|
||||||
import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
|
import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
|
||||||
|
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
|
||||||
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
|
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
|
||||||
import org.apache.hadoop.hbase.protobuf.RequestConverter;
|
import org.apache.hadoop.hbase.protobuf.RequestConverter;
|
||||||
import org.apache.hadoop.hbase.protobuf.ResponseConverter;
|
import org.apache.hadoop.hbase.protobuf.ResponseConverter;
|
||||||
|
@ -46,6 +47,7 @@ import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
|
||||||
import org.apache.hadoop.ipc.RemoteException;
|
import org.apache.hadoop.ipc.RemoteException;
|
||||||
import org.apache.hadoop.net.DNS;
|
import org.apache.hadoop.net.DNS;
|
||||||
|
|
||||||
|
import com.google.protobuf.RpcController;
|
||||||
import com.google.protobuf.ServiceException;
|
import com.google.protobuf.ServiceException;
|
||||||
import com.google.protobuf.TextFormat;
|
import com.google.protobuf.TextFormat;
|
||||||
|
|
||||||
|
@ -81,22 +83,25 @@ public class ScannerCallable extends RegionServerCallable<Result[]> {
|
||||||
// indicate if it is a remote server call
|
// indicate if it is a remote server call
|
||||||
protected boolean isRegionServerRemote = true;
|
protected boolean isRegionServerRemote = true;
|
||||||
private long nextCallSeq = 0;
|
private long nextCallSeq = 0;
|
||||||
|
private RpcControllerFactory rpcFactory;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @param connection which connection
|
* @param connection which connection
|
||||||
* @param tableName table callable is on
|
* @param tableName table callable is on
|
||||||
* @param scan the scan to execute
|
* @param scan the scan to execute
|
||||||
* @param scanMetrics the ScanMetrics to used, if it is null, ScannerCallable
|
* @param scanMetrics the ScanMetrics to used, if it is null, ScannerCallable won't collect
|
||||||
* won't collect metrics
|
* metrics
|
||||||
|
* @param rpcControllerFactory factory to use when creating {@link RpcController}
|
||||||
*/
|
*/
|
||||||
public ScannerCallable (HConnection connection, TableName tableName, Scan scan,
|
public ScannerCallable (HConnection connection, TableName tableName, Scan scan,
|
||||||
ScanMetrics scanMetrics) {
|
ScanMetrics scanMetrics, RpcControllerFactory rpcControllerFactory) {
|
||||||
super(connection, tableName, scan.getStartRow());
|
super(connection, tableName, scan.getStartRow());
|
||||||
this.scan = scan;
|
this.scan = scan;
|
||||||
this.scanMetrics = scanMetrics;
|
this.scanMetrics = scanMetrics;
|
||||||
Configuration conf = connection.getConfiguration();
|
Configuration conf = connection.getConfiguration();
|
||||||
logScannerActivity = conf.getBoolean(LOG_SCANNER_ACTIVITY, false);
|
logScannerActivity = conf.getBoolean(LOG_SCANNER_ACTIVITY, false);
|
||||||
logCutOffLatency = conf.getInt(LOG_SCANNER_LATENCY_CUTOFF, 1000);
|
logCutOffLatency = conf.getInt(LOG_SCANNER_LATENCY_CUTOFF, 1000);
|
||||||
|
this.rpcFactory = rpcControllerFactory;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -105,7 +110,8 @@ public class ScannerCallable extends RegionServerCallable<Result[]> {
|
||||||
@Deprecated
|
@Deprecated
|
||||||
public ScannerCallable (HConnection connection, final byte [] tableName, Scan scan,
|
public ScannerCallable (HConnection connection, final byte [] tableName, Scan scan,
|
||||||
ScanMetrics scanMetrics) {
|
ScanMetrics scanMetrics) {
|
||||||
this(connection, TableName.valueOf(tableName), scan, scanMetrics);
|
this(connection, TableName.valueOf(tableName), scan, scanMetrics, RpcControllerFactory
|
||||||
|
.instantiate(connection.getConfiguration()));
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -161,7 +167,7 @@ public class ScannerCallable extends RegionServerCallable<Result[]> {
|
||||||
incRPCcallsMetrics();
|
incRPCcallsMetrics();
|
||||||
request = RequestConverter.buildScanRequest(scannerId, caching, false, nextCallSeq);
|
request = RequestConverter.buildScanRequest(scannerId, caching, false, nextCallSeq);
|
||||||
ScanResponse response = null;
|
ScanResponse response = null;
|
||||||
PayloadCarryingRpcController controller = new PayloadCarryingRpcController();
|
PayloadCarryingRpcController controller = rpcFactory.newController();
|
||||||
controller.setPriority(getTableName());
|
controller.setPriority(getTableName());
|
||||||
controller.setCallTimeout(callTimeout);
|
controller.setCallTimeout(callTimeout);
|
||||||
try {
|
try {
|
||||||
|
|
|
@ -0,0 +1,58 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.ipc;
|
||||||
|
|
||||||
|
import org.apache.hadoop.hbase.CellScanner;
|
||||||
|
import org.apache.hadoop.hbase.TableName;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Simple delegating controller for use with the {@link RpcControllerFactory} to help override
|
||||||
|
* standard behavior of a {@link PayloadCarryingRpcController}.
|
||||||
|
*/
|
||||||
|
public class DelegatingPayloadCarryingRpcController extends PayloadCarryingRpcController {
|
||||||
|
private PayloadCarryingRpcController delegate;
|
||||||
|
|
||||||
|
public DelegatingPayloadCarryingRpcController(PayloadCarryingRpcController delegate) {
|
||||||
|
this.delegate = delegate;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public CellScanner cellScanner() {
|
||||||
|
return delegate.cellScanner();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void setCellScanner(final CellScanner cellScanner) {
|
||||||
|
delegate.setCellScanner(cellScanner);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void setPriority(int priority) {
|
||||||
|
delegate.setPriority(priority);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void setPriority(final TableName tn) {
|
||||||
|
delegate.setPriority(tn);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int getPriority() {
|
||||||
|
return delegate.getPriority();
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,59 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.ipc;
|
||||||
|
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.hbase.CellScannable;
|
||||||
|
import org.apache.hadoop.hbase.CellScanner;
|
||||||
|
import org.apache.hadoop.hbase.util.ReflectionUtils;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Factory to create a {@link PayloadCarryingRpcController}
|
||||||
|
*/
|
||||||
|
public class RpcControllerFactory {
|
||||||
|
|
||||||
|
public static final String CUSTOM_CONTROLLER_CONF_KEY = "hbase.rpc.controllerfactory.class";
|
||||||
|
protected final Configuration conf;
|
||||||
|
|
||||||
|
public RpcControllerFactory(Configuration conf) {
|
||||||
|
this.conf = conf;
|
||||||
|
}
|
||||||
|
|
||||||
|
public PayloadCarryingRpcController newController() {
|
||||||
|
return new PayloadCarryingRpcController();
|
||||||
|
}
|
||||||
|
|
||||||
|
public PayloadCarryingRpcController newController(final CellScanner cellScanner) {
|
||||||
|
return new PayloadCarryingRpcController(cellScanner);
|
||||||
|
}
|
||||||
|
|
||||||
|
public PayloadCarryingRpcController newController(final List<CellScannable> cellIterables) {
|
||||||
|
return new PayloadCarryingRpcController(cellIterables);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
public static RpcControllerFactory instantiate(Configuration configuration) {
|
||||||
|
String rpcControllerFactoryClazz =
|
||||||
|
configuration.get(CUSTOM_CONTROLLER_CONF_KEY,
|
||||||
|
RpcControllerFactory.class.getName());
|
||||||
|
return ReflectionUtils.instantiateWithCustomCtor(rpcControllerFactoryClazz,
|
||||||
|
new Class[] { Configuration.class }, new Object[] { configuration });
|
||||||
|
}
|
||||||
|
}
|
|
@ -30,6 +30,7 @@ import org.apache.hadoop.hbase.ServerName;
|
||||||
import org.apache.hadoop.hbase.client.AsyncProcess.AsyncRequestFuture;
|
import org.apache.hadoop.hbase.client.AsyncProcess.AsyncRequestFuture;
|
||||||
import org.apache.hadoop.hbase.client.coprocessor.Batch;
|
import org.apache.hadoop.hbase.client.coprocessor.Batch;
|
||||||
import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback;
|
import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback;
|
||||||
|
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
import org.apache.hadoop.hbase.util.Threads;
|
import org.apache.hadoop.hbase.util.Threads;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
|
@ -125,14 +126,14 @@ public class TestAsyncProcess {
|
||||||
public MyAsyncProcess(ClusterConnection hc, Configuration conf, AtomicInteger nbThreads) {
|
public MyAsyncProcess(ClusterConnection hc, Configuration conf, AtomicInteger nbThreads) {
|
||||||
super(hc, conf, new ThreadPoolExecutor(1, 20, 60, TimeUnit.SECONDS,
|
super(hc, conf, new ThreadPoolExecutor(1, 20, 60, TimeUnit.SECONDS,
|
||||||
new SynchronousQueue<Runnable>(), new CountingThreadFactory(nbThreads)),
|
new SynchronousQueue<Runnable>(), new CountingThreadFactory(nbThreads)),
|
||||||
new RpcRetryingCallerFactory(conf), false);
|
new RpcRetryingCallerFactory(conf), false, new RpcControllerFactory(conf));
|
||||||
}
|
}
|
||||||
|
|
||||||
public MyAsyncProcess(
|
public MyAsyncProcess(
|
||||||
ClusterConnection hc, Configuration conf, boolean useGlobalErrors) {
|
ClusterConnection hc, Configuration conf, boolean useGlobalErrors) {
|
||||||
super(hc, conf, new ThreadPoolExecutor(1, 20, 60, TimeUnit.SECONDS,
|
super(hc, conf, new ThreadPoolExecutor(1, 20, 60, TimeUnit.SECONDS,
|
||||||
new SynchronousQueue<Runnable>(), new CountingThreadFactory(new AtomicInteger())),
|
new SynchronousQueue<Runnable>(), new CountingThreadFactory(new AtomicInteger())),
|
||||||
new RpcRetryingCallerFactory(conf), useGlobalErrors);
|
new RpcRetryingCallerFactory(conf), useGlobalErrors, new RpcControllerFactory(conf));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -40,6 +40,7 @@ import org.apache.hadoop.hbase.client.HConnection;
|
||||||
import org.apache.hadoop.hbase.client.RegionServerCallable;
|
import org.apache.hadoop.hbase.client.RegionServerCallable;
|
||||||
import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory;
|
import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory;
|
||||||
import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
|
import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
|
||||||
|
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
|
||||||
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
|
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
|
||||||
import org.apache.hadoop.hbase.protobuf.ReplicationProtbufUtil;
|
import org.apache.hadoop.hbase.protobuf.ReplicationProtbufUtil;
|
||||||
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos;
|
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos;
|
||||||
|
@ -69,6 +70,7 @@ public class WALEditsReplaySink {
|
||||||
private final AtomicLong totalReplayedEdits = new AtomicLong();
|
private final AtomicLong totalReplayedEdits = new AtomicLong();
|
||||||
private final boolean skipErrors;
|
private final boolean skipErrors;
|
||||||
private final int replayTimeout;
|
private final int replayTimeout;
|
||||||
|
private RpcControllerFactory rpcControllerFactory;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Create a sink for WAL log entries replay
|
* Create a sink for WAL log entries replay
|
||||||
|
@ -87,6 +89,7 @@ public class WALEditsReplaySink {
|
||||||
HConstants.DEFAULT_HREGION_EDITS_REPLAY_SKIP_ERRORS);
|
HConstants.DEFAULT_HREGION_EDITS_REPLAY_SKIP_ERRORS);
|
||||||
// a single replay operation time out and default is 60 seconds
|
// a single replay operation time out and default is 60 seconds
|
||||||
this.replayTimeout = conf.getInt("hbase.regionserver.logreplay.timeout", 60000);
|
this.replayTimeout = conf.getInt("hbase.regionserver.logreplay.timeout", 60000);
|
||||||
|
this.rpcControllerFactory = RpcControllerFactory.instantiate(conf);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -211,7 +214,7 @@ public class WALEditsReplaySink {
|
||||||
|
|
||||||
Pair<AdminProtos.ReplicateWALEntryRequest, CellScanner> p =
|
Pair<AdminProtos.ReplicateWALEntryRequest, CellScanner> p =
|
||||||
ReplicationProtbufUtil.buildReplicateWALEntryRequest(entriesArray);
|
ReplicationProtbufUtil.buildReplicateWALEntryRequest(entriesArray);
|
||||||
PayloadCarryingRpcController controller = new PayloadCarryingRpcController(p.getSecond());
|
PayloadCarryingRpcController controller = rpcControllerFactory.newController(p.getSecond());
|
||||||
try {
|
try {
|
||||||
remoteSvr.replay(controller, p.getFirst());
|
remoteSvr.replay(controller, p.getFirst());
|
||||||
} catch (ServiceException se) {
|
} catch (ServiceException se) {
|
||||||
|
|
|
@ -28,6 +28,7 @@ import org.apache.hadoop.hbase.ZooKeeperConnectionException;
|
||||||
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos;
|
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos;
|
||||||
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
|
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
|
||||||
import org.apache.hadoop.hbase.client.ConnectionManager.HConnectionImplementation;
|
import org.apache.hadoop.hbase.client.ConnectionManager.HConnectionImplementation;
|
||||||
|
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
|
||||||
import org.mockito.Mockito;
|
import org.mockito.Mockito;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -121,8 +122,9 @@ public class HConnectionTestingUtility {
|
||||||
}
|
}
|
||||||
NonceGenerator ng = Mockito.mock(NonceGenerator.class);
|
NonceGenerator ng = Mockito.mock(NonceGenerator.class);
|
||||||
Mockito.when(c.getNonceGenerator()).thenReturn(ng);
|
Mockito.when(c.getNonceGenerator()).thenReturn(ng);
|
||||||
Mockito.when(c.getAsyncProcess()).thenReturn(new AsyncProcess(
|
Mockito.when(c.getAsyncProcess()).thenReturn(
|
||||||
c, conf, null, RpcRetryingCallerFactory.instantiate(conf), false));
|
new AsyncProcess(c, conf, null, RpcRetryingCallerFactory.instantiate(conf), false,
|
||||||
|
RpcControllerFactory.instantiate(conf)));
|
||||||
Mockito.doNothing().when(c).incCount();
|
Mockito.doNothing().when(c).incCount();
|
||||||
Mockito.doNothing().when(c).decCount();
|
Mockito.doNothing().when(c).decCount();
|
||||||
return c;
|
return c;
|
||||||
|
|
|
@ -0,0 +1,203 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.client;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.hbase.HBaseTestingUtility.fam1;
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.hbase.CellScannable;
|
||||||
|
import org.apache.hadoop.hbase.CellScanner;
|
||||||
|
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||||
|
import org.apache.hadoop.hbase.HConstants;
|
||||||
|
import org.apache.hadoop.hbase.MediumTests;
|
||||||
|
import org.apache.hadoop.hbase.TableName;
|
||||||
|
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
|
||||||
|
import org.apache.hadoop.hbase.coprocessor.ProtobufCoprocessorService;
|
||||||
|
import org.apache.hadoop.hbase.ipc.DelegatingPayloadCarryingRpcController;
|
||||||
|
import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
|
||||||
|
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
|
||||||
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
import org.junit.AfterClass;
|
||||||
|
import org.junit.BeforeClass;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.junit.experimental.categories.Category;
|
||||||
|
|
||||||
|
import com.google.common.collect.Lists;
|
||||||
|
|
||||||
|
@Category(MediumTests.class)
|
||||||
|
public class TestRpcControllerFactory {
|
||||||
|
|
||||||
|
public static class StaticRpcControllerFactory extends RpcControllerFactory {
|
||||||
|
|
||||||
|
public StaticRpcControllerFactory(Configuration conf) {
|
||||||
|
super(conf);
|
||||||
|
}
|
||||||
|
|
||||||
|
public PayloadCarryingRpcController newController() {
|
||||||
|
return new CountingRpcController(super.newController());
|
||||||
|
}
|
||||||
|
|
||||||
|
public PayloadCarryingRpcController newController(final CellScanner cellScanner) {
|
||||||
|
return new CountingRpcController(super.newController(cellScanner));
|
||||||
|
}
|
||||||
|
|
||||||
|
public PayloadCarryingRpcController newController(final List<CellScannable> cellIterables) {
|
||||||
|
return new CountingRpcController(super.newController(cellIterables));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public static class CountingRpcController extends DelegatingPayloadCarryingRpcController {
|
||||||
|
|
||||||
|
private static AtomicInteger INT_PRIORITY = new AtomicInteger();
|
||||||
|
private static AtomicInteger TABLE_PRIORITY = new AtomicInteger();
|
||||||
|
|
||||||
|
public CountingRpcController(PayloadCarryingRpcController delegate) {
|
||||||
|
super(delegate);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void setPriority(int priority) {
|
||||||
|
super.setPriority(priority);
|
||||||
|
INT_PRIORITY.incrementAndGet();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void setPriority(TableName tn) {
|
||||||
|
super.setPriority(tn);
|
||||||
|
// ignore counts for system tables - it could change and we really only want to check on what
|
||||||
|
// the client should change
|
||||||
|
if (!tn.isSystemTable()) {
|
||||||
|
TABLE_PRIORITY.incrementAndGet();
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
|
||||||
|
|
||||||
|
@BeforeClass
|
||||||
|
public static void setup() throws Exception {
|
||||||
|
// load an endpoint so we have an endpoint to test - it doesn't matter which one, but
|
||||||
|
// this is already in tests, so we can just use it.
|
||||||
|
Configuration conf = UTIL.getConfiguration();
|
||||||
|
conf.set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
|
||||||
|
ProtobufCoprocessorService.class.getName());
|
||||||
|
|
||||||
|
UTIL.startMiniCluster();
|
||||||
|
}
|
||||||
|
|
||||||
|
@AfterClass
|
||||||
|
public static void teardown() throws Exception {
|
||||||
|
UTIL.shutdownMiniCluster();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* check some of the methods and make sure we are incrementing each time. Its a bit tediuous to
|
||||||
|
* cover all methods here and really is a bit brittle since we can always add new methods but
|
||||||
|
* won't be sure to add them here. So we just can cover the major ones.
|
||||||
|
* @throws Exception on failure
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testCountController() throws Exception {
|
||||||
|
Configuration conf = new Configuration(UTIL.getConfiguration());
|
||||||
|
// setup our custom controller
|
||||||
|
conf.set(RpcControllerFactory.CUSTOM_CONTROLLER_CONF_KEY,
|
||||||
|
StaticRpcControllerFactory.class.getName());
|
||||||
|
|
||||||
|
TableName name = TableName.valueOf("testcustomcontroller");
|
||||||
|
UTIL.createTable(name, fam1).close();
|
||||||
|
|
||||||
|
// change one of the connection properties so we get a new HConnection with our configuration
|
||||||
|
conf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT + 1);
|
||||||
|
|
||||||
|
HTable table = new HTable(conf, name);
|
||||||
|
table.setAutoFlushTo(false);
|
||||||
|
byte[] row = Bytes.toBytes("row");
|
||||||
|
Put p = new Put(row);
|
||||||
|
p.add(fam1, fam1, Bytes.toBytes("val0"));
|
||||||
|
table.put(p);
|
||||||
|
table.flushCommits();
|
||||||
|
Integer counter = 1;
|
||||||
|
counter = verifyCount(counter);
|
||||||
|
|
||||||
|
Delete d = new Delete(row);
|
||||||
|
d.deleteColumn(fam1, fam1);
|
||||||
|
table.delete(d);
|
||||||
|
counter = verifyCount(counter);
|
||||||
|
|
||||||
|
Put p2 = new Put(row);
|
||||||
|
p2.add(fam1, Bytes.toBytes("qual"), Bytes.toBytes("val1"));
|
||||||
|
table.batch(Lists.newArrayList(p, p2), new Object[2]);
|
||||||
|
// this only goes to a single server, so we don't need to change the count here
|
||||||
|
counter = verifyCount(counter);
|
||||||
|
|
||||||
|
Append append = new Append(row);
|
||||||
|
append.add(fam1, fam1, Bytes.toBytes("val2"));
|
||||||
|
table.append(append);
|
||||||
|
counter = verifyCount(counter);
|
||||||
|
|
||||||
|
// and check the major lookup calls as well
|
||||||
|
Get g = new Get(row);
|
||||||
|
table.get(g);
|
||||||
|
counter = verifyCount(counter);
|
||||||
|
|
||||||
|
ResultScanner scan = table.getScanner(fam1);
|
||||||
|
scan.next();
|
||||||
|
scan.close();
|
||||||
|
counter = verifyCount(counter);
|
||||||
|
|
||||||
|
Get g2 = new Get(row);
|
||||||
|
table.get(Lists.newArrayList(g, g2));
|
||||||
|
// same server, so same as above for not changing count
|
||||||
|
counter = verifyCount(counter);
|
||||||
|
|
||||||
|
// make sure all the scanner types are covered
|
||||||
|
Scan scanInfo = new Scan(row);
|
||||||
|
// regular small
|
||||||
|
scanInfo.setSmall(true);
|
||||||
|
counter = doScan(table, scanInfo, counter);
|
||||||
|
|
||||||
|
// reversed, small
|
||||||
|
scanInfo.setReversed(true);
|
||||||
|
counter = doScan(table, scanInfo, counter);
|
||||||
|
|
||||||
|
// reversed, regular
|
||||||
|
scanInfo.setSmall(false);
|
||||||
|
counter = doScan(table, scanInfo, counter);
|
||||||
|
|
||||||
|
table.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
int doScan(HTable table, Scan scan, int expectedCount) throws IOException {
|
||||||
|
ResultScanner results = table.getScanner(scan);
|
||||||
|
results.next();
|
||||||
|
results.close();
|
||||||
|
return verifyCount(expectedCount);
|
||||||
|
}
|
||||||
|
|
||||||
|
int verifyCount(Integer counter) {
|
||||||
|
assertEquals(counter.intValue(), CountingRpcController.TABLE_PRIORITY.get());
|
||||||
|
assertEquals(0, CountingRpcController.INT_PRIORITY.get());
|
||||||
|
return counter + 1;
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue