HBASE-15816 Provide client with ability to set priority on Operations
Signed-off-by: Andrew Purtell <apurtell@apache.org>
This commit is contained in:
parent
6f1cc2c89f
commit
26247996d2
@ -34,11 +34,17 @@ public class Action<R> implements Comparable<R> {
|
||||
private int originalIndex;
|
||||
private long nonce = HConstants.NO_NONCE;
|
||||
private int replicaId = RegionReplicaUtil.DEFAULT_REPLICA_ID;
|
||||
private int priority;
|
||||
|
||||
public Action(Row action, int originalIndex) {
|
||||
this(action, originalIndex, HConstants.PRIORITY_UNSET);
|
||||
}
|
||||
|
||||
public Action(Row action, int originalIndex, int priority) {
|
||||
super();
|
||||
this.action = action;
|
||||
this.originalIndex = originalIndex;
|
||||
this.priority = priority;
|
||||
}
|
||||
|
||||
/**
|
||||
@ -75,6 +81,8 @@ public class Action<R> implements Comparable<R> {
|
||||
return replicaId;
|
||||
}
|
||||
|
||||
public int getPriority() { return priority; }
|
||||
|
||||
@SuppressWarnings("rawtypes")
|
||||
@Override
|
||||
public int compareTo(Object o) {
|
||||
|
@ -86,6 +86,7 @@ public class Append extends Mutation {
|
||||
for (Map.Entry<String, byte[]> entry : a.getAttributesMap().entrySet()) {
|
||||
this.setAttribute(entry.getKey(), entry.getValue());
|
||||
}
|
||||
this.setPriority(a.getPriority());
|
||||
}
|
||||
|
||||
/** Create a Append operation for the specified row.
|
||||
@ -183,6 +184,11 @@ public class Append extends Mutation {
|
||||
return (Append) super.setACL(perms);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Append setPriority(int priority) {
|
||||
return (Append) super.setPriority(priority);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Append setTTL(long ttl) {
|
||||
return (Append) super.setTTL(ttl);
|
||||
|
@ -504,7 +504,11 @@ class AsyncProcess {
|
||||
LOG.error("Failed to get region location ", ex);
|
||||
// This action failed before creating ars. Retain it, but do not add to submit list.
|
||||
// We will then add it to ars in an already-failed state.
|
||||
retainedActions.add(new Action<Row>(r, ++posInList));
|
||||
int priority = HConstants.NORMAL_QOS;
|
||||
if (r instanceof Mutation) {
|
||||
priority = ((Mutation) r).getPriority();
|
||||
}
|
||||
retainedActions.add(new Action<Row>(r, ++posInList, priority));
|
||||
locationErrors.add(ex);
|
||||
locationErrorRows.add(posInList);
|
||||
it.remove();
|
||||
@ -516,7 +520,11 @@ class AsyncProcess {
|
||||
break;
|
||||
}
|
||||
if (code == ReturnCode.INCLUDE) {
|
||||
Action<Row> action = new Action<Row>(r, ++posInList);
|
||||
int priority = HConstants.NORMAL_QOS;
|
||||
if (r instanceof Mutation) {
|
||||
priority = ((Mutation) r).getPriority();
|
||||
}
|
||||
Action<Row> action = new Action<Row>(r, ++posInList, priority);
|
||||
setNonce(ng, r, action);
|
||||
retainedActions.add(action);
|
||||
// TODO: replica-get is not supported on this path
|
||||
@ -619,6 +627,7 @@ class AsyncProcess {
|
||||
// The position will be used by the processBatch to match the object array returned.
|
||||
int posInList = -1;
|
||||
NonceGenerator ng = this.connection.getNonceGenerator();
|
||||
int highestPriority = HConstants.PRIORITY_UNSET;
|
||||
for (Row r : rows) {
|
||||
posInList++;
|
||||
if (r instanceof Put) {
|
||||
@ -626,8 +635,9 @@ class AsyncProcess {
|
||||
if (put.isEmpty()) {
|
||||
throw new IllegalArgumentException("No columns to insert for #" + (posInList+1)+ " item");
|
||||
}
|
||||
highestPriority = Math.max(put.getPriority(), highestPriority);
|
||||
}
|
||||
Action<Row> action = new Action<Row>(r, posInList);
|
||||
Action<Row> action = new Action<Row>(r, posInList, highestPriority);
|
||||
setNonce(ng, r, action);
|
||||
actions.add(action);
|
||||
}
|
||||
@ -1782,7 +1792,7 @@ class AsyncProcess {
|
||||
protected MultiServerCallable<Row> createCallable(final ServerName server,
|
||||
TableName tableName, final MultiAction<Row> multi) {
|
||||
return new MultiServerCallable<Row>(connection, tableName, server,
|
||||
AsyncProcess.this.rpcFactory, multi, rpcTimeout, tracker);
|
||||
AsyncProcess.this.rpcFactory, multi, rpcTimeout, tracker, multi.getPriority());
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -147,6 +147,7 @@ public class Delete extends Mutation implements Comparable<Row> {
|
||||
for (Map.Entry<String, byte[]> entry : d.getAttributesMap().entrySet()) {
|
||||
this.setAttribute(entry.getKey(), entry.getValue());
|
||||
}
|
||||
super.setPriority(d.getPriority());
|
||||
}
|
||||
|
||||
/**
|
||||
@ -478,4 +479,10 @@ public class Delete extends Mutation implements Comparable<Row> {
|
||||
public Delete setTTL(long ttl) {
|
||||
throw new UnsupportedOperationException("Setting TTLs on Deletes is not supported");
|
||||
}
|
||||
|
||||
@Override
|
||||
public Delete setPriority(int priority) {
|
||||
return (Delete) super.setPriority(priority);
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -130,6 +130,7 @@ public class Get extends Query
|
||||
TimeRange tr = entry.getValue();
|
||||
setColumnFamilyTimeRange(entry.getKey(), tr.getMin(), tr.getMax());
|
||||
}
|
||||
super.setPriority(get.getPriority());
|
||||
}
|
||||
|
||||
public boolean isCheckExistenceOnly() {
|
||||
@ -511,4 +512,9 @@ public class Get extends Query
|
||||
return (Get) super.setIsolationLevel(level);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Get setPriority(int priority) {
|
||||
return (Get) super.setPriority(priority);
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -846,13 +846,14 @@ public class HTable implements HTableInterface, RegionLocator {
|
||||
// Good old call.
|
||||
final Get getReq = get;
|
||||
RegionServerCallable<Result> callable = new RegionServerCallable<Result>(this.connection,
|
||||
getName(), get.getRow()) {
|
||||
getName(), get.getRow(), get.getPriority()) {
|
||||
@Override
|
||||
public Result call(int callTimeout) throws IOException {
|
||||
ClientProtos.GetRequest request =
|
||||
RequestConverter.buildGetRequest(getLocation().getRegionInfo().getRegionName(), getReq);
|
||||
HBaseRpcController controller = rpcControllerFactory.newController();
|
||||
controller.setPriority(tableName);
|
||||
controller.setPriority(getPriority());
|
||||
controller.setCallTimeout(callTimeout);
|
||||
try {
|
||||
ClientProtos.GetResponse response = getStub().get(controller, request);
|
||||
@ -973,11 +974,12 @@ public class HTable implements HTableInterface, RegionLocator {
|
||||
public void delete(final Delete delete)
|
||||
throws IOException {
|
||||
RegionServerCallable<Boolean> callable = new RegionServerCallable<Boolean>(connection,
|
||||
tableName, delete.getRow()) {
|
||||
tableName, delete.getRow(), delete.getPriority()) {
|
||||
@Override
|
||||
public Boolean call(int callTimeout) throws IOException {
|
||||
HBaseRpcController controller = rpcControllerFactory.newController();
|
||||
controller.setPriority(tableName);
|
||||
controller.setPriority(getPriority());
|
||||
controller.setCallTimeout(callTimeout);
|
||||
|
||||
try {
|
||||
@ -1055,6 +1057,7 @@ public class HTable implements HTableInterface, RegionLocator {
|
||||
public MultiResponse call(int callTimeout) throws IOException {
|
||||
controller.reset();
|
||||
controller.setPriority(tableName);
|
||||
controller.setPriority(rm.getMaxPriority());
|
||||
int remainingTime = tracker.getRemainingTime(callTimeout);
|
||||
if (remainingTime == 0) {
|
||||
throw new DoNotRetryIOException("Timeout for mutate row");
|
||||
@ -1103,12 +1106,12 @@ public class HTable implements HTableInterface, RegionLocator {
|
||||
NonceGenerator ng = this.connection.getNonceGenerator();
|
||||
final long nonceGroup = ng.getNonceGroup(), nonce = ng.newNonce();
|
||||
RegionServerCallable<Result> callable =
|
||||
new RegionServerCallable<Result>(this.connection, getName(), append.getRow()) {
|
||||
new RegionServerCallable<Result>(this.connection, getName(), append.getRow(), append.getPriority()) {
|
||||
@Override
|
||||
public Result call(int callTimeout) throws IOException {
|
||||
HBaseRpcController controller = rpcControllerFactory.newController();
|
||||
controller.setPriority(getTableName());
|
||||
controller.setCallTimeout(callTimeout);
|
||||
controller.setCallTimeout(getPriority());
|
||||
try {
|
||||
MutateRequest request = RequestConverter.buildMutateRequest(
|
||||
getLocation().getRegionInfo().getRegionName(), append, nonceGroup, nonce);
|
||||
@ -1136,11 +1139,12 @@ public class HTable implements HTableInterface, RegionLocator {
|
||||
NonceGenerator ng = this.connection.getNonceGenerator();
|
||||
final long nonceGroup = ng.getNonceGroup(), nonce = ng.newNonce();
|
||||
RegionServerCallable<Result> callable = new RegionServerCallable<Result>(this.connection,
|
||||
getName(), increment.getRow()) {
|
||||
getName(), increment.getRow(), increment.getPriority()) {
|
||||
@Override
|
||||
public Result call(int callTimeout) throws IOException {
|
||||
HBaseRpcController controller = rpcControllerFactory.newController();
|
||||
controller.setPriority(getTableName());
|
||||
controller.setPriority(getPriority());
|
||||
controller.setCallTimeout(callTimeout);
|
||||
try {
|
||||
MutateRequest request = RequestConverter.buildMutateRequest(
|
||||
@ -1236,11 +1240,12 @@ public class HTable implements HTableInterface, RegionLocator {
|
||||
final Put put)
|
||||
throws IOException {
|
||||
RegionServerCallable<Boolean> callable =
|
||||
new RegionServerCallable<Boolean>(connection, getName(), row) {
|
||||
new RegionServerCallable<Boolean>(connection, getName(), row, put.getPriority()) {
|
||||
@Override
|
||||
public Boolean call(int callTimeout) throws IOException {
|
||||
HBaseRpcController controller = rpcControllerFactory.newController();
|
||||
controller.setPriority(tableName);
|
||||
controller.setPriority(getPriority());
|
||||
controller.setCallTimeout(callTimeout);
|
||||
try {
|
||||
MutateRequest request = RequestConverter.buildMutateRequest(
|
||||
@ -1266,11 +1271,12 @@ public class HTable implements HTableInterface, RegionLocator {
|
||||
final Put put)
|
||||
throws IOException {
|
||||
RegionServerCallable<Boolean> callable =
|
||||
new RegionServerCallable<Boolean>(connection, getName(), row) {
|
||||
new RegionServerCallable<Boolean>(connection, getName(), row, put.getPriority()) {
|
||||
@Override
|
||||
public Boolean call(int callTimeout) throws IOException {
|
||||
HBaseRpcController controller = rpcControllerFactory.newController();
|
||||
controller.setPriority(tableName);
|
||||
controller.setPriority(getPriority());
|
||||
controller.setCallTimeout(callTimeout);
|
||||
try {
|
||||
CompareType compareType = CompareType.valueOf(compareOp.name());
|
||||
@ -1297,11 +1303,12 @@ public class HTable implements HTableInterface, RegionLocator {
|
||||
final Delete delete)
|
||||
throws IOException {
|
||||
RegionServerCallable<Boolean> callable =
|
||||
new RegionServerCallable<Boolean>(connection, getName(), row) {
|
||||
new RegionServerCallable<Boolean>(connection, getName(), row, delete.getPriority()) {
|
||||
@Override
|
||||
public Boolean call(int callTimeout) throws IOException {
|
||||
HBaseRpcController controller = rpcControllerFactory.newController();
|
||||
controller.setPriority(tableName);
|
||||
controller.setPriority(getPriority());
|
||||
controller.setCallTimeout(callTimeout);
|
||||
try {
|
||||
MutateRequest request = RequestConverter.buildMutateRequest(
|
||||
@ -1327,11 +1334,12 @@ public class HTable implements HTableInterface, RegionLocator {
|
||||
final Delete delete)
|
||||
throws IOException {
|
||||
RegionServerCallable<Boolean> callable =
|
||||
new RegionServerCallable<Boolean>(connection, getName(), row) {
|
||||
new RegionServerCallable<Boolean>(connection, getName(), row, delete.getPriority()) {
|
||||
@Override
|
||||
public Boolean call(int callTimeout) throws IOException {
|
||||
HBaseRpcController controller = rpcControllerFactory.newController();
|
||||
controller.setPriority(tableName);
|
||||
controller.setPriority(getPriority());
|
||||
controller.setCallTimeout(callTimeout);
|
||||
try {
|
||||
CompareType compareType = CompareType.valueOf(compareOp.name());
|
||||
@ -1364,6 +1372,7 @@ public class HTable implements HTableInterface, RegionLocator {
|
||||
public MultiResponse call(int callTimeout) throws IOException {
|
||||
controller.reset();
|
||||
controller.setPriority(tableName);
|
||||
controller.setPriority(rm.getMaxPriority());
|
||||
int remainingTime = tracker.getRemainingTime(callTimeout);
|
||||
if (remainingTime == 0) {
|
||||
throw new DoNotRetryIOException("Timeout for mutate row");
|
||||
|
@ -86,6 +86,7 @@ public class Increment extends Mutation implements Comparable<Row> {
|
||||
for (Map.Entry<String, byte[]> entry : i.getAttributesMap().entrySet()) {
|
||||
this.setAttribute(entry.getKey(), entry.getValue());
|
||||
}
|
||||
super.setPriority(i.getPriority());
|
||||
}
|
||||
|
||||
/**
|
||||
@ -351,4 +352,9 @@ public class Increment extends Mutation implements Comparable<Row> {
|
||||
public Increment setTTL(long ttl) {
|
||||
return (Increment) super.setTTL(ttl);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Increment setPriority(int priority) {
|
||||
return (Increment) super.setPriority(priority);
|
||||
}
|
||||
}
|
||||
|
@ -104,4 +104,14 @@ public final class MultiAction<R> {
|
||||
public long getNonceGroup() {
|
||||
return this.nonceGroup;
|
||||
}
|
||||
|
||||
public int getPriority() {
|
||||
int maxPriority = HConstants.PRIORITY_UNSET;
|
||||
for (List<Action<R>> actionList : actions.values()) {
|
||||
for (Action<R> action : actionList) {
|
||||
maxPriority = Math.max(maxPriority, action.getPriority());
|
||||
}
|
||||
}
|
||||
return maxPriority;
|
||||
}
|
||||
}
|
||||
|
@ -57,8 +57,8 @@ class MultiServerCallable<R> extends PayloadCarryingServerCallable<MultiResponse
|
||||
|
||||
MultiServerCallable(final ClusterConnection connection, final TableName tableName,
|
||||
final ServerName location, RpcControllerFactory rpcFactory, final MultiAction<R> multi,
|
||||
int rpcTimeout, RetryingTimeTracker tracker) {
|
||||
super(connection, tableName, null, rpcFactory);
|
||||
int rpcTimeout, RetryingTimeTracker tracker, int priority) {
|
||||
super(connection, tableName, null, rpcFactory, priority);
|
||||
this.multiAction = multi;
|
||||
// RegionServerCallable has HRegionLocation field, but this is a multi-region request.
|
||||
// Using region info from parent HRegionLocation would be a mistake for this class; so
|
||||
@ -130,6 +130,7 @@ class MultiServerCallable<R> extends PayloadCarryingServerCallable<MultiResponse
|
||||
controller.reset();
|
||||
if (cells != null) controller.setCellScanner(CellUtil.createCellScanner(cells));
|
||||
controller.setPriority(getTableName());
|
||||
controller.setPriority(getPriority());
|
||||
controller.setCallTimeout(callTimeout);
|
||||
ClientProtos.MultiResponse responseProto;
|
||||
ClientProtos.MultiRequest requestProto = multiRequestBuilder.build();
|
||||
|
@ -71,7 +71,10 @@ public abstract class Mutation extends OperationWithAttributes implements Row, C
|
||||
// familyMap
|
||||
ClassSize.REFERENCE +
|
||||
// familyMap
|
||||
ClassSize.TREEMAP);
|
||||
ClassSize.TREEMAP +
|
||||
// priority
|
||||
ClassSize.INTEGER
|
||||
);
|
||||
|
||||
/**
|
||||
* The attribute for storing the list of clusters that have consumed the change.
|
||||
|
@ -19,6 +19,7 @@
|
||||
|
||||
package org.apache.hadoop.hbase.client;
|
||||
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceStability;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
@ -36,6 +37,7 @@ public abstract class OperationWithAttributes extends Operation implements Attri
|
||||
|
||||
// used for uniquely identifying an operation
|
||||
public static final String ID_ATRIBUTE = "_operation.attributes.id";
|
||||
private int priority = HConstants.PRIORITY_UNSET;
|
||||
|
||||
@Override
|
||||
public OperationWithAttributes setAttribute(String name, byte[] value) {
|
||||
@ -110,4 +112,13 @@ public abstract class OperationWithAttributes extends Operation implements Attri
|
||||
byte[] attr = getAttribute(ID_ATRIBUTE);
|
||||
return attr == null? null: Bytes.toString(attr);
|
||||
}
|
||||
|
||||
public OperationWithAttributes setPriority(int priority) {
|
||||
this.priority = priority;
|
||||
return this;
|
||||
}
|
||||
|
||||
public int getPriority() {
|
||||
return priority;
|
||||
}
|
||||
}
|
||||
|
@ -16,6 +16,7 @@
|
||||
*/
|
||||
package org.apache.hadoop.hbase.client;
|
||||
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
|
||||
@ -31,8 +32,13 @@ public abstract class PayloadCarryingServerCallable<T>
|
||||
protected HBaseRpcController controller;
|
||||
|
||||
public PayloadCarryingServerCallable(Connection connection, TableName tableName, byte[] row,
|
||||
RpcControllerFactory rpcControllerFactory) {
|
||||
super(connection, tableName, row);
|
||||
RpcControllerFactory rpcControllerFactory) {
|
||||
this(connection, tableName, row, rpcControllerFactory, HConstants.NORMAL_QOS);
|
||||
}
|
||||
|
||||
public PayloadCarryingServerCallable(Connection connection, TableName tableName, byte[] row,
|
||||
RpcControllerFactory rpcControllerFactory, int priority) {
|
||||
super(connection, tableName, row, priority);
|
||||
this.controller = rpcControllerFactory.newController();
|
||||
}
|
||||
|
||||
|
@ -23,6 +23,7 @@ import java.io.IOException;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
import org.apache.hadoop.hbase.HRegionLocation;
|
||||
@ -50,6 +51,7 @@ public abstract class RegionServerCallable<T> implements RetryingCallable<T> {
|
||||
protected final byte[] row;
|
||||
protected HRegionLocation location;
|
||||
private ClientService.BlockingInterface stub;
|
||||
protected int priority;
|
||||
|
||||
/**
|
||||
* @param connection Connection to use.
|
||||
@ -57,9 +59,14 @@ public abstract class RegionServerCallable<T> implements RetryingCallable<T> {
|
||||
* @param row The row we want in <code>tableName</code>.
|
||||
*/
|
||||
public RegionServerCallable(Connection connection, TableName tableName, byte [] row) {
|
||||
this(connection, tableName, row, HConstants.NORMAL_QOS);
|
||||
}
|
||||
|
||||
public RegionServerCallable(Connection connection, TableName tableName, byte [] row, int priority) {
|
||||
this.connection = connection;
|
||||
this.tableName = tableName;
|
||||
this.row = row;
|
||||
this.priority = priority;
|
||||
}
|
||||
|
||||
/**
|
||||
@ -117,6 +124,10 @@ public abstract class RegionServerCallable<T> implements RetryingCallable<T> {
|
||||
return this.row;
|
||||
}
|
||||
|
||||
public int getPriority() {
|
||||
return priority;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void throwable(Throwable t, boolean retrying) {
|
||||
if (location != null) {
|
||||
|
@ -114,4 +114,12 @@ public class RowMutations implements Row {
|
||||
public List<Mutation> getMutations() {
|
||||
return Collections.unmodifiableList(mutations);
|
||||
}
|
||||
|
||||
public int getMaxPriority() {
|
||||
int maxPriority = Integer.MIN_VALUE;
|
||||
for (Mutation mutation : mutations) {
|
||||
maxPriority = Math.max(maxPriority, mutation.getPriority());
|
||||
}
|
||||
return maxPriority;
|
||||
}
|
||||
}
|
||||
|
@ -38,6 +38,7 @@ import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.DoNotRetryIOException;
|
||||
import org.apache.hadoop.hbase.HBaseIOException;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.HRegionLocation;
|
||||
import org.apache.hadoop.hbase.RegionLocations;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
@ -100,7 +101,7 @@ public class RpcRetryingCallerWithReadReplicas {
|
||||
|
||||
public ReplicaRegionServerCallable(int id, HRegionLocation location) {
|
||||
super(RpcRetryingCallerWithReadReplicas.this.cConnection,
|
||||
RpcRetryingCallerWithReadReplicas.this.tableName, get.getRow());
|
||||
RpcRetryingCallerWithReadReplicas.this.tableName, get.getRow(), HConstants.PRIORITY_UNSET);
|
||||
this.id = id;
|
||||
this.location = location;
|
||||
this.controller = rpcControllerFactory.newController();
|
||||
|
@ -278,6 +278,7 @@ public class Scan extends Query {
|
||||
this.mvccReadPoint = scan.getMvccReadPoint();
|
||||
this.limit = scan.getLimit();
|
||||
this.needCursorResult = scan.isNeedCursorResult();
|
||||
setPriority(scan.getPriority());
|
||||
}
|
||||
|
||||
/**
|
||||
@ -307,6 +308,7 @@ public class Scan extends Query {
|
||||
setColumnFamilyTimeRange(entry.getKey(), tr.getMin(), tr.getMax());
|
||||
}
|
||||
this.mvccReadPoint = -1L;
|
||||
setPriority(get.getPriority());
|
||||
}
|
||||
|
||||
public boolean isGetScan() {
|
||||
@ -1088,6 +1090,11 @@ public class Scan extends Query {
|
||||
return (Scan) super.setIsolationLevel(level);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Scan setPriority(int priority) {
|
||||
return (Scan) super.setPriority(priority);
|
||||
}
|
||||
|
||||
/**
|
||||
* Utility that creates a Scan that will do a small scan in reverse from passed row
|
||||
* looking for next closest row.
|
||||
|
@ -131,7 +131,7 @@ public class ScannerCallable extends RegionServerCallable<Result[]> {
|
||||
*/
|
||||
public ScannerCallable (ClusterConnection connection, TableName tableName, Scan scan,
|
||||
ScanMetrics scanMetrics, RpcControllerFactory rpcControllerFactory, int id) {
|
||||
super(connection, tableName, scan.getStartRow());
|
||||
super(connection, tableName, scan.getStartRow(), scan.getPriority());
|
||||
this.id = id;
|
||||
this.cConnection = connection;
|
||||
this.scan = scan;
|
||||
|
@ -37,8 +37,6 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
@InterfaceAudience.Private
|
||||
public interface HBaseRpcController extends RpcController, CellScannable {
|
||||
|
||||
static final int PRIORITY_UNSET = -1;
|
||||
|
||||
/**
|
||||
* Only used to send cells to rpc server, the returned cells should be set by
|
||||
* {@link #setDone(CellScanner)}.
|
||||
|
@ -56,7 +56,7 @@ public class HBaseRpcControllerImpl implements HBaseRpcController {
|
||||
* This is the ordained way of setting priorities going forward. We will be undoing the old
|
||||
* annotation-based mechanism.
|
||||
*/
|
||||
private int priority = PRIORITY_UNSET;
|
||||
private int priority = HConstants.PRIORITY_UNSET;
|
||||
|
||||
/**
|
||||
* They are optionally set on construction, cleared after we make the call, and then optionally
|
||||
@ -95,7 +95,7 @@ public class HBaseRpcControllerImpl implements HBaseRpcController {
|
||||
|
||||
@Override
|
||||
public void setPriority(int priority) {
|
||||
this.priority = priority;
|
||||
this.priority = Math.max(this.priority, priority);
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -106,7 +106,7 @@ public class HBaseRpcControllerImpl implements HBaseRpcController {
|
||||
|
||||
@Override
|
||||
public int getPriority() {
|
||||
return priority;
|
||||
return priority < 0 ? HConstants.NORMAL_QOS : priority;
|
||||
}
|
||||
|
||||
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "IS2_INCONSISTENT_SYNC",
|
||||
|
@ -29,6 +29,7 @@ import java.net.SocketTimeoutException;
|
||||
import java.nio.ByteBuffer;
|
||||
|
||||
import org.apache.hadoop.hbase.DoNotRetryIOException;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.exceptions.ConnectionClosingException;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.CellBlockMeta;
|
||||
@ -111,7 +112,7 @@ class IPCUtil {
|
||||
builder.setCellBlockMeta(cellBlockMeta);
|
||||
}
|
||||
// Only pass priority if there is one set.
|
||||
if (call.priority != HBaseRpcController.PRIORITY_UNSET) {
|
||||
if (call.priority != HConstants.PRIORITY_UNSET) {
|
||||
builder.setPriority(call.priority);
|
||||
}
|
||||
builder.setTimeout(call.timeout);
|
||||
|
@ -26,6 +26,7 @@ import java.io.IOException;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.client.ClusterConnection;
|
||||
@ -84,7 +85,7 @@ public class RegionCoprocessorRpcChannel extends CoprocessorRpcChannel{
|
||||
final ClientProtos.CoprocessorServiceCall call =
|
||||
CoprocessorRpcUtils.buildServiceCall(row, method, request);
|
||||
RegionServerCallable<CoprocessorServiceResponse> callable =
|
||||
new RegionServerCallable<CoprocessorServiceResponse>(connection, table, row) {
|
||||
new RegionServerCallable<CoprocessorServiceResponse>(connection, table, row, HConstants.PRIORITY_UNSET) {
|
||||
@Override
|
||||
public CoprocessorServiceResponse call(int callTimeout) throws Exception {
|
||||
if (rpcController instanceof HBaseRpcController) {
|
||||
|
@ -1114,6 +1114,7 @@ public final class HConstants {
|
||||
* handled by high priority handlers.
|
||||
*/
|
||||
// normal_QOS < QOS_threshold < replication_QOS < replay_QOS < admin_QOS < high_QOS
|
||||
public static final int PRIORITY_UNSET = -1;
|
||||
public static final int NORMAL_QOS = 0;
|
||||
public static final int QOS_THRESHOLD = 10;
|
||||
public static final int HIGH_QOS = 200;
|
||||
|
@ -20,13 +20,16 @@ package org.apache.hadoop.hbase.client;
|
||||
import static org.apache.hadoop.hbase.HBaseTestingUtility.fam1;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import com.google.common.collect.ConcurrentHashMultiset;
|
||||
import com.google.common.collect.Lists;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import com.google.common.collect.Multiset;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.CellScannable;
|
||||
import org.apache.hadoop.hbase.CellScanner;
|
||||
@ -72,6 +75,7 @@ public class TestRpcControllerFactory {
|
||||
|
||||
public static class CountingRpcController extends DelegatingHBaseRpcController {
|
||||
|
||||
private static Multiset<Integer> GROUPED_PRIORITY = ConcurrentHashMultiset.create();
|
||||
private static AtomicInteger INT_PRIORITY = new AtomicInteger();
|
||||
private static AtomicInteger TABLE_PRIORITY = new AtomicInteger();
|
||||
|
||||
@ -81,8 +85,13 @@ public class TestRpcControllerFactory {
|
||||
|
||||
@Override
|
||||
public void setPriority(int priority) {
|
||||
int oldPriority = getPriority();
|
||||
super.setPriority(priority);
|
||||
INT_PRIORITY.incrementAndGet();
|
||||
int newPriority = getPriority();
|
||||
if (newPriority != oldPriority) {
|
||||
INT_PRIORITY.incrementAndGet();
|
||||
GROUPED_PRIORITY.add(priority);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -189,6 +198,14 @@ public class TestRpcControllerFactory {
|
||||
scanInfo.setSmall(false);
|
||||
counter = doScan(table, scanInfo, counter);
|
||||
|
||||
// make sure we have no priority count
|
||||
verifyPriorityGroupCount(HConstants.ADMIN_QOS, 0);
|
||||
// lets set a custom priority on a get
|
||||
Get get = new Get(row);
|
||||
get.setPriority(HConstants.ADMIN_QOS);
|
||||
table.get(get);
|
||||
verifyPriorityGroupCount(HConstants.ADMIN_QOS, 1);
|
||||
|
||||
table.close();
|
||||
}
|
||||
|
||||
@ -200,9 +217,13 @@ public class TestRpcControllerFactory {
|
||||
}
|
||||
|
||||
int verifyCount(Integer counter) {
|
||||
assertEquals(counter.intValue(), CountingRpcController.TABLE_PRIORITY.get());
|
||||
assertTrue(CountingRpcController.TABLE_PRIORITY.get() >= counter);
|
||||
assertEquals(0, CountingRpcController.INT_PRIORITY.get());
|
||||
return counter + 1;
|
||||
return CountingRpcController.TABLE_PRIORITY.get() + 1;
|
||||
}
|
||||
|
||||
void verifyPriorityGroupCount(int priorityLevel, int count) {
|
||||
assertEquals(count, CountingRpcController.GROUPED_PRIORITY.count(priorityLevel));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -377,6 +377,7 @@ public class TestHeapSize {
|
||||
expected = ClassSize.estimateBase(cl, false);
|
||||
//The actual TreeMap is not included in the above calculation
|
||||
expected += ClassSize.align(ClassSize.TREEMAP);
|
||||
expected += ClassSize.align(ClassSize.INTEGER); // priority
|
||||
if (expected != actual) {
|
||||
ClassSize.estimateBase(cl, true);
|
||||
assertEquals(expected, actual);
|
||||
@ -387,6 +388,7 @@ public class TestHeapSize {
|
||||
expected = ClassSize.estimateBase(cl, false);
|
||||
//The actual TreeMap is not included in the above calculation
|
||||
expected += ClassSize.align(ClassSize.TREEMAP);
|
||||
expected += ClassSize.align(ClassSize.INTEGER); // priority
|
||||
if (expected != actual) {
|
||||
ClassSize.estimateBase(cl, true);
|
||||
assertEquals(expected, actual);
|
||||
|
Loading…
x
Reference in New Issue
Block a user