HBASE-15816 Provide client with ability to set priority on Operations

Signed-off-by: Andrew Purtell <apurtell@apache.org>
This commit is contained in:
rgidwani 2017-07-14 10:18:26 -07:00 committed by Andrew Purtell
parent 946289113a
commit d461bec6c2
38 changed files with 178 additions and 50 deletions

View File

@ -32,10 +32,16 @@ public class Action implements Comparable<Action> {
private final int originalIndex;
private long nonce = HConstants.NO_NONCE;
private int replicaId = RegionReplicaUtil.DEFAULT_REPLICA_ID;
private int priority;
public Action(Row action, int originalIndex) {
this(action, originalIndex, HConstants.PRIORITY_UNSET);
}
public Action(Row action, int originalIndex, int priority) {
this.action = action;
this.originalIndex = originalIndex;
this.priority = priority;
}
/**
@ -70,6 +76,8 @@ public class Action implements Comparable<Action> {
return replicaId;
}
public int getPriority() { return priority; }
@Override
public int compareTo(Action other) {
return action.compareTo(other.getAction());

View File

@ -84,6 +84,7 @@ public class Append extends Mutation {
for (Map.Entry<String, byte[]> entry : a.getAttributesMap().entrySet()) {
this.setAttribute(entry.getKey(), entry.getValue());
}
this.setPriority(a.getPriority());
}
/** Create a Append operation for the specified row.
@ -183,6 +184,11 @@ public class Append extends Mutation {
return (Append) super.setACL(perms);
}
@Override
public Append setPriority(int priority) {
return (Append) super.setPriority(priority);
}
@Override
public Append setTTL(long ttl) {
return (Append) super.setTTL(ttl);

View File

@ -291,7 +291,12 @@ class AsyncProcess {
LOG.error("Failed to get region location ", ex);
// This action failed before creating ars. Retain it, but do not add to submit list.
// We will then add it to ars in an already-failed state.
retainedActions.add(new Action(r, ++posInList));
int priority = HConstants.NORMAL_QOS;
if (r instanceof Mutation) {
priority = ((Mutation) r).getPriority();
}
retainedActions.add(new Action(r, ++posInList, priority));
locationErrors.add(ex);
locationErrorRows.add(posInList);
it.remove();
@ -302,7 +307,11 @@ class AsyncProcess {
break;
}
if (code == ReturnCode.INCLUDE) {
Action action = new Action(r, ++posInList);
int priority = HConstants.NORMAL_QOS;
if (r instanceof Mutation) {
priority = ((Mutation) r).getPriority();
}
Action action = new Action(r, ++posInList, priority);
setNonce(ng, r, action);
retainedActions.add(action);
// TODO: replica-get is not supported on this path
@ -372,6 +381,7 @@ class AsyncProcess {
// The position will be used by the processBatch to match the object array returned.
int posInList = -1;
NonceGenerator ng = this.connection.getNonceGenerator();
int highestPriority = HConstants.PRIORITY_UNSET;
for (Row r : rows) {
posInList++;
if (r instanceof Put) {
@ -379,8 +389,9 @@ class AsyncProcess {
if (put.isEmpty()) {
throw new IllegalArgumentException("No columns to insert for #" + (posInList+1)+ " item");
}
highestPriority = Math.max(put.getPriority(), highestPriority);
}
Action action = new Action(r, posInList);
Action action = new Action(r, posInList, highestPriority);
setNonce(ng, r, action);
actions.add(action);
}

View File

@ -1267,6 +1267,6 @@ class AsyncRequestFutureImpl<CResult> implements AsyncRequestFuture {
private MultiServerCallable createCallable(final ServerName server, TableName tableName,
final MultiAction multi) {
return new MultiServerCallable(asyncProcess.connection, tableName, server,
multi, asyncProcess.rpcFactory.newController(), rpcTimeout, tracker);
multi, asyncProcess.rpcFactory.newController(), rpcTimeout, tracker, multi.getPriority());
}
}

View File

@ -40,8 +40,8 @@ abstract class CancellableRegionServerCallable<T> extends ClientServiceCallable<
private final RetryingTimeTracker tracker;
private final int rpcTimeout;
CancellableRegionServerCallable(Connection connection, TableName tableName, byte[] row,
RpcController rpcController, int rpcTimeout, RetryingTimeTracker tracker) {
super(connection, tableName, row, rpcController);
RpcController rpcController, int rpcTimeout, RetryingTimeTracker tracker, int priority) {
super(connection, tableName, row, rpcController, priority);
this.rpcTimeout = rpcTimeout;
this.tracker = tracker;
}

View File

@ -33,9 +33,10 @@ import org.apache.hadoop.hbase.shaded.com.google.protobuf.RpcController;
@InterfaceAudience.Private
public abstract class ClientServiceCallable<T> extends
RegionServerCallable<T, ClientProtos.ClientService.BlockingInterface> {
public ClientServiceCallable(Connection connection, TableName tableName, byte [] row,
RpcController rpcController) {
super(connection, tableName, row, rpcController);
RpcController rpcController, int priority) {
super(connection, tableName, row, rpcController, priority);
}
@Override

View File

@ -147,6 +147,7 @@ public class Delete extends Mutation implements Comparable<Row> {
for (Map.Entry<String, byte[]> entry : d.getAttributesMap().entrySet()) {
this.setAttribute(entry.getKey(), entry.getValue());
}
super.setPriority(d.getPriority());
}
/**
@ -369,4 +370,9 @@ public class Delete extends Mutation implements Comparable<Row> {
public Delete setTTL(long ttl) {
throw new UnsupportedOperationException("Setting TTLs on Deletes is not supported");
}
@Override
public Delete setPriority(int priority) {
return (Delete) super.setPriority(priority);
}
}

View File

@ -127,6 +127,7 @@ public class Get extends Query
TimeRange tr = entry.getValue();
setColumnFamilyTimeRange(entry.getKey(), tr.getMin(), tr.getMax());
}
super.setPriority(get.getPriority());
}
/**
@ -552,4 +553,8 @@ public class Get extends Query
return (Get) super.setIsolationLevel(level);
}
@Override
public Get setPriority(int priority) {
return (Get) super.setPriority(priority);
}
}

View File

@ -415,7 +415,7 @@ public class HTable implements Table {
if (get.getConsistency() == Consistency.STRONG) {
final Get configuredGet = get;
ClientServiceCallable<Result> callable = new ClientServiceCallable<Result>(this.connection, getName(),
get.getRow(), this.rpcControllerFactory.newController()) {
get.getRow(), this.rpcControllerFactory.newController(), get.getPriority()) {
@Override
protected Result rpcCall() throws Exception {
ClientProtos.GetRequest request = RequestConverter.buildGetRequest(
@ -547,7 +547,7 @@ public class HTable implements Table {
CancellableRegionServerCallable<SingleResponse> callable =
new CancellableRegionServerCallable<SingleResponse>(
connection, getName(), delete.getRow(), this.rpcControllerFactory.newController(),
writeRpcTimeout, new RetryingTimeTracker().start()) {
writeRpcTimeout, new RetryingTimeTracker().start(), delete.getPriority()) {
@Override
protected SingleResponse rpcCall() throws Exception {
MutateRequest request = RequestConverter.buildMutateRequest(
@ -624,7 +624,7 @@ public class HTable implements Table {
public void mutateRow(final RowMutations rm) throws IOException {
CancellableRegionServerCallable<MultiResponse> callable =
new CancellableRegionServerCallable<MultiResponse>(this.connection, getName(), rm.getRow(),
rpcControllerFactory.newController(), writeRpcTimeout, new RetryingTimeTracker().start()){
rpcControllerFactory.newController(), writeRpcTimeout, new RetryingTimeTracker().start(), rm.getMaxPriority()){
@Override
protected MultiResponse rpcCall() throws Exception {
RegionAction.Builder regionMutationBuilder = RequestConverter.buildRegionAction(
@ -668,7 +668,7 @@ public class HTable implements Table {
checkHasFamilies(append);
NoncedRegionServerCallable<Result> callable =
new NoncedRegionServerCallable<Result>(this.connection, getName(), append.getRow(),
this.rpcControllerFactory.newController()) {
this.rpcControllerFactory.newController(), append.getPriority()) {
@Override
protected Result rpcCall() throws Exception {
MutateRequest request = RequestConverter.buildMutateRequest(
@ -690,7 +690,7 @@ public class HTable implements Table {
checkHasFamilies(increment);
NoncedRegionServerCallable<Result> callable =
new NoncedRegionServerCallable<Result>(this.connection, getName(), increment.getRow(),
this.rpcControllerFactory.newController()) {
this.rpcControllerFactory.newController(), increment.getPriority()) {
@Override
protected Result rpcCall() throws Exception {
MutateRequest request = RequestConverter.buildMutateRequest(
@ -734,7 +734,7 @@ public class HTable implements Table {
NoncedRegionServerCallable<Long> callable =
new NoncedRegionServerCallable<Long>(this.connection, getName(), row,
this.rpcControllerFactory.newController()) {
this.rpcControllerFactory.newController(), HConstants.PRIORITY_UNSET) {
@Override
protected Long rpcCall() throws Exception {
MutateRequest request = RequestConverter.buildIncrementRequest(
@ -758,7 +758,7 @@ public class HTable implements Table {
final Put put)
throws IOException {
ClientServiceCallable<Boolean> callable = new ClientServiceCallable<Boolean>(this.connection, getName(), row,
this.rpcControllerFactory.newController()) {
this.rpcControllerFactory.newController(), put.getPriority()) {
@Override
protected Boolean rpcCall() throws Exception {
MutateRequest request = RequestConverter.buildMutateRequest(
@ -782,7 +782,7 @@ public class HTable implements Table {
throws IOException {
ClientServiceCallable<Boolean> callable =
new ClientServiceCallable<Boolean>(this.connection, getName(), row,
this.rpcControllerFactory.newController()) {
this.rpcControllerFactory.newController(), put.getPriority()) {
@Override
protected Boolean rpcCall() throws Exception {
CompareType compareType = CompareType.valueOf(compareOp.name());
@ -817,7 +817,7 @@ public class HTable implements Table {
CancellableRegionServerCallable<SingleResponse> callable =
new CancellableRegionServerCallable<SingleResponse>(
this.connection, getName(), row, this.rpcControllerFactory.newController(),
writeRpcTimeout, new RetryingTimeTracker().start()) {
writeRpcTimeout, new RetryingTimeTracker().start(), delete.getPriority()) {
@Override
protected SingleResponse rpcCall() throws Exception {
CompareType compareType = CompareType.valueOf(compareOp.name());
@ -858,7 +858,7 @@ public class HTable implements Table {
throws IOException {
CancellableRegionServerCallable<MultiResponse> callable =
new CancellableRegionServerCallable<MultiResponse>(connection, getName(), rm.getRow(),
rpcControllerFactory.newController(), writeRpcTimeout, new RetryingTimeTracker().start()) {
rpcControllerFactory.newController(), writeRpcTimeout, new RetryingTimeTracker().start(), rm.getMaxPriority()) {
@Override
protected MultiResponse rpcCall() throws Exception {
CompareType compareType = CompareType.valueOf(compareOp.name());

View File

@ -84,6 +84,7 @@ public class Increment extends Mutation implements Comparable<Row> {
for (Map.Entry<String, byte[]> entry : i.getAttributesMap().entrySet()) {
this.setAttribute(entry.getKey(), entry.getValue());
}
super.setPriority(i.getPriority());
}
/**
@ -331,4 +332,9 @@ public class Increment extends Mutation implements Comparable<Row> {
public Increment setTTL(long ttl) {
return (Increment) super.setTTL(ttl);
}
@Override
public Increment setPriority(int priority) {
return (Increment) super.setPriority(priority);
}
}

View File

@ -20,11 +20,16 @@ package org.apache.hadoop.hbase.client;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.TreeMap;
import java.util.function.Consumer;
import java.util.function.Predicate;
import com.google.common.collect.Iterables;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.util.Bytes;
@ -103,4 +108,11 @@ public final class MultiAction {
public long getNonceGroup() {
return this.nonceGroup;
}
// returns the max priority of all the actions
public int getPriority() {
Optional<Action> result = actions.values().stream().flatMap(List::stream)
.max((action1, action2) -> Math.max(action1.getPriority(), action2.getPriority()));
return result.isPresent() ? result.get().getPriority() : HConstants.PRIORITY_UNSET;
}
}

View File

@ -55,8 +55,8 @@ class MultiServerCallable extends CancellableRegionServerCallable<MultiResponse>
MultiServerCallable(final ClusterConnection connection, final TableName tableName,
final ServerName location, final MultiAction multi, RpcController rpcController,
int rpcTimeout, RetryingTimeTracker tracker) {
super(connection, tableName, null, rpcController, rpcTimeout, tracker);
int rpcTimeout, RetryingTimeTracker tracker, int priority) {
super(connection, tableName, null, rpcController, rpcTimeout, tracker, priority);
this.multiAction = multi;
// RegionServerCallable has HRegionLocation field, but this is a multi-region request.
// Using region info from parent HRegionLocation would be a mistake for this class; so

View File

@ -71,7 +71,10 @@ public abstract class Mutation extends OperationWithAttributes implements Row, C
// familyMap
ClassSize.REFERENCE +
// familyMap
ClassSize.TREEMAP);
ClassSize.TREEMAP +
// priority
ClassSize.INTEGER
);
/**
* The attribute for storing the list of clusters that have consumed the change.

View File

@ -47,8 +47,8 @@ public abstract class NoncedRegionServerCallable<T> extends ClientServiceCallabl
* @param row The row we want in <code>tableName</code>.
*/
public NoncedRegionServerCallable(Connection connection, TableName tableName, byte [] row,
HBaseRpcController rpcController) {
super(connection, tableName, row, rpcController);
HBaseRpcController rpcController, int priority) {
super(connection, tableName, row, rpcController, priority);
this.nonce = getConnection().getNonceGenerator().newNonce();
}

View File

@ -23,6 +23,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ClassSize;
@ -34,6 +35,7 @@ public abstract class OperationWithAttributes extends Operation implements Attri
// used for uniquely identifying an operation
public static final String ID_ATRIBUTE = "_operation.attributes.id";
private int priority = HConstants.PRIORITY_UNSET;
@Override
public OperationWithAttributes setAttribute(String name, byte[] value) {
@ -108,4 +110,14 @@ public abstract class OperationWithAttributes extends Operation implements Attri
byte[] attr = getAttribute(ID_ATRIBUTE);
return attr == null? null: Bytes.toString(attr);
}
public OperationWithAttributes setPriority(int priority) {
this.priority = priority;
return this;
}
public int getPriority() {
return priority;
}
}

View File

@ -21,6 +21,7 @@ import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils;
@ -77,7 +78,7 @@ class RegionCoprocessorRpcChannel extends SyncCoprocessorRpcChannel {
}
ClientServiceCallable<CoprocessorServiceResponse> callable =
new ClientServiceCallable<CoprocessorServiceResponse>(this.conn,
this.table, this.row, this.conn.getRpcControllerFactory().newController()) {
this.table, this.row, this.conn.getRpcControllerFactory().newController(), HConstants.PRIORITY_UNSET) {
@Override
protected CoprocessorServiceResponse rpcCall() throws Exception {
byte [] regionName = getLocation().getRegionInfo().getRegionName();

View File

@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.client;
import java.io.IOException;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.ServerName;
@ -66,6 +67,7 @@ public abstract class RegionServerCallable<T, S> implements RetryingCallable<T>
* Can be null!
*/
protected final RpcController rpcController;
private int priority = HConstants.NORMAL_QOS;
/**
* @param connection Connection to use.
@ -75,11 +77,17 @@ public abstract class RegionServerCallable<T, S> implements RetryingCallable<T>
*/
public RegionServerCallable(Connection connection, TableName tableName, byte [] row,
RpcController rpcController) {
this(connection, tableName, row, rpcController, HConstants.NORMAL_QOS);
}
public RegionServerCallable(Connection connection, TableName tableName, byte [] row,
RpcController rpcController, int priority) {
super();
this.connection = connection;
this.tableName = tableName;
this.row = row;
this.rpcController = rpcController;
this.priority = priority;
}
protected RpcController getRpcController() {
@ -111,6 +119,7 @@ public abstract class RegionServerCallable<T, S> implements RetryingCallable<T>
// If it is an instance of HBaseRpcController, we can set priority on the controller based
// off the tableName. Set call timeout too.
hrc.setPriority(tableName);
hrc.setPriority(priority);
hrc.setCallTimeout(callTimeout);
}
}
@ -172,6 +181,8 @@ public abstract class RegionServerCallable<T, S> implements RetryingCallable<T>
return this.row;
}
protected int getPriority() { return this.priority;}
public void throwable(Throwable t, boolean retrying) {
if (location != null) {
getConnection().updateCachedLocations(tableName, location.getRegionInfo().getRegionName(),

View File

@ -118,4 +118,12 @@ public class RowMutations implements Row {
public List<Mutation> getMutations() {
return Collections.unmodifiableList(mutations);
}
public int getMaxPriority() {
int maxPriority = Integer.MIN_VALUE;
for (Mutation mutation : mutations) {
maxPriority = Math.max(maxPriority, mutation.getPriority());
}
return maxPriority;
}
}

View File

@ -27,7 +27,6 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -45,6 +44,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import static org.apache.hadoop.hbase.HConstants.PRIORITY_UNSET;
/**
* Caller that goes to replica if the primary region does no answer within a configurable
@ -96,7 +96,7 @@ public class RpcRetryingCallerWithReadReplicas {
public ReplicaRegionServerCallable(int id, HRegionLocation location) {
super(RpcRetryingCallerWithReadReplicas.this.cConnection,
RpcRetryingCallerWithReadReplicas.this.tableName, get.getRow(),
rpcControllerFactory.newController(), rpcTimeout, new RetryingTimeTracker());
rpcControllerFactory.newController(), rpcTimeout, new RetryingTimeTracker(), PRIORITY_UNSET);
this.id = id;
this.location = location;
}

View File

@ -276,6 +276,7 @@ public class Scan extends Query {
this.mvccReadPoint = scan.getMvccReadPoint();
this.limit = scan.getLimit();
this.needCursorResult = scan.isNeedCursorResult();
setPriority(scan.getPriority());
}
/**
@ -306,6 +307,7 @@ public class Scan extends Query {
setColumnFamilyTimeRange(entry.getKey(), tr.getMin(), tr.getMax());
}
this.mvccReadPoint = -1L;
setPriority(get.getPriority());
}
public boolean isGetScan() {
@ -1060,6 +1062,11 @@ public class Scan extends Query {
return (Scan) super.setIsolationLevel(level);
}
@Override
public Scan setPriority(int priority) {
return (Scan) super.setPriority(priority);
}
/**
* Enable collection of {@link ScanMetrics}. For advanced users.
* @param enabled Set to true to enable accumulating scan metrics

View File

@ -117,7 +117,7 @@ public class ScannerCallable extends ClientServiceCallable<Result[]> {
*/
public ScannerCallable(ClusterConnection connection, TableName tableName, Scan scan,
ScanMetrics scanMetrics, RpcControllerFactory rpcControllerFactory, int id) {
super(connection, tableName, scan.getStartRow(), rpcControllerFactory.newController());
super(connection, tableName, scan.getStartRow(), rpcControllerFactory.newController(), scan.getPriority());
this.id = id;
this.scan = scan;
this.scanMetrics = scanMetrics;

View File

@ -22,7 +22,6 @@ import java.io.IOException;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
@ -39,6 +38,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpeci
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
import org.apache.hadoop.security.token.Token;
import static org.apache.hadoop.hbase.HConstants.PRIORITY_UNSET;
/**
* Client proxy for SecureBulkLoadProtocol
*/
@ -56,7 +57,7 @@ public class SecureBulkLoadClient {
try {
ClientServiceCallable<String> callable = new ClientServiceCallable<String>(conn,
table.getName(), HConstants.EMPTY_START_ROW,
this.rpcControllerFactory.newController()) {
this.rpcControllerFactory.newController(), PRIORITY_UNSET) {
@Override
protected String rpcCall() throws Exception {
byte[] regionName = getLocation().getRegionInfo().getRegionName();
@ -79,7 +80,7 @@ public class SecureBulkLoadClient {
public void cleanupBulkLoad(final Connection conn, final String bulkToken) throws IOException {
try {
ClientServiceCallable<Void> callable = new ClientServiceCallable<Void>(conn,
table.getName(), HConstants.EMPTY_START_ROW, this.rpcControllerFactory.newController()) {
table.getName(), HConstants.EMPTY_START_ROW, this.rpcControllerFactory.newController(), PRIORITY_UNSET) {
@Override
protected Void rpcCall() throws Exception {
byte[] regionName = getLocation().getRegionInfo().getRegionName();

View File

@ -37,8 +37,6 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
@InterfaceAudience.Private
public interface HBaseRpcController extends RpcController, CellScannable {
static final int PRIORITY_UNSET = -1;
/**
* Only used to send cells to rpc server, the returned cells should be set by
* {@link #setDone(CellScanner)}.

View File

@ -56,7 +56,7 @@ public class HBaseRpcControllerImpl implements HBaseRpcController {
* This is the ordained way of setting priorities going forward. We will be undoing the old
* annotation-based mechanism.
*/
private int priority = PRIORITY_UNSET;
private int priority = HConstants.PRIORITY_UNSET;
/**
* They are optionally set on construction, cleared after we make the call, and then optionally
@ -95,7 +95,8 @@ public class HBaseRpcControllerImpl implements HBaseRpcController {
@Override
public void setPriority(int priority) {
this.priority = priority;
this.priority = Math.max(this.priority, priority);
}
@Override
@ -106,7 +107,7 @@ public class HBaseRpcControllerImpl implements HBaseRpcController {
@Override
public int getPriority() {
return priority;
return priority < 0 ? HConstants.NORMAL_QOS : priority;
}
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "IS2_INCONSISTENT_SYNC",

View File

@ -29,6 +29,7 @@ import java.net.SocketTimeoutException;
import java.nio.ByteBuffer;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.exceptions.ConnectionClosingException;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.CellBlockMeta;
@ -111,7 +112,7 @@ class IPCUtil {
builder.setCellBlockMeta(cellBlockMeta);
}
// Only pass priority if there is one set.
if (call.priority != HBaseRpcController.PRIORITY_UNSET) {
if (call.priority != HConstants.PRIORITY_UNSET) {
builder.setPriority(call.priority);
}
builder.setTimeout(call.timeout);

View File

@ -1113,6 +1113,7 @@ public final class HConstants {
* handled by high priority handlers.
*/
// normal_QOS < replication_QOS < replay_QOS < QOS_threshold < admin_QOS < high_QOS
public static final int PRIORITY_UNSET = -1;
public static final int NORMAL_QOS = 0;
public static final int REPLICATION_QOS = 5;
public static final int REPLAY_QOS = 6;

View File

@ -28,6 +28,8 @@ import java.io.IOException;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.curator.shaded.com.google.common.collect.ConcurrentHashMultiset;
import org.apache.curator.shaded.com.google.common.collect.Multiset;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CellScannable;
import org.apache.hadoop.hbase.CellScanner;
@ -76,6 +78,7 @@ public class TestRpcControllerFactory {
public static class CountingRpcController extends DelegatingHBaseRpcController {
private static Multiset<Integer> GROUPED_PRIORITY = ConcurrentHashMultiset.create();
private static AtomicInteger INT_PRIORITY = new AtomicInteger();
private static AtomicInteger TABLE_PRIORITY = new AtomicInteger();
@ -85,8 +88,13 @@ public class TestRpcControllerFactory {
@Override
public void setPriority(int priority) {
int oldPriority = getPriority();
super.setPriority(priority);
INT_PRIORITY.incrementAndGet();
int newPriority = getPriority();
if (newPriority != oldPriority) {
INT_PRIORITY.incrementAndGet();
GROUPED_PRIORITY.add(priority);
}
}
@Override
@ -196,6 +204,14 @@ public class TestRpcControllerFactory {
scanInfo.setSmall(false);
counter = doScan(table, scanInfo, counter + 1);
// make sure we have no priority count
verifyPriorityGroupCount(HConstants.ADMIN_QOS, 0);
// lets set a custom priority on a get
Get get = new Get(row);
get.setPriority(HConstants.ADMIN_QOS);
table.get(get);
verifyPriorityGroupCount(HConstants.ADMIN_QOS, 1);
table.close();
connection.close();
}
@ -208,11 +224,15 @@ public class TestRpcControllerFactory {
}
int verifyCount(Integer counter) {
assertTrue(CountingRpcController.TABLE_PRIORITY.get() >= counter.intValue());
assertTrue(CountingRpcController.TABLE_PRIORITY.get() >= counter);
assertEquals(0, CountingRpcController.INT_PRIORITY.get());
return CountingRpcController.TABLE_PRIORITY.get() + 1;
}
void verifyPriorityGroupCount(int priorityLevel, int count) {
assertEquals(count, CountingRpcController.GROUPED_PRIORITY.count(priorityLevel));
}
@Test
public void testFallbackToDefaultRpcControllerFactory() {
Configuration conf = new Configuration(UTIL.getConfiguration());

View File

@ -27,6 +27,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.MultithreadedTestUtil.RepeatingTestThread;
import org.apache.hadoop.hbase.MultithreadedTestUtil.TestContext;
import org.apache.hadoop.hbase.TableName;
@ -108,7 +109,7 @@ public class TestHRegionServerBulkLoadWithOldSecureEndpoint extends TestHRegionS
RpcControllerFactory rpcControllerFactory = new RpcControllerFactory(UTIL.getConfiguration());
ClientServiceCallable<Void> callable =
new ClientServiceCallable<Void>(conn, tableName, Bytes.toBytes("aaa"),
rpcControllerFactory.newController()) {
rpcControllerFactory.newController(), HConstants.PRIORITY_UNSET) {
@Override
protected Void rpcCall() throws Exception {
LOG.debug("Going to connect to server " + getLocation() + " for row " +
@ -128,7 +129,7 @@ public class TestHRegionServerBulkLoadWithOldSecureEndpoint extends TestHRegionS
if (numBulkLoads.get() % 5 == 0) {
// 5 * 50 = 250 open file handles!
callable = new ClientServiceCallable<Void>(conn, tableName, Bytes.toBytes("aaa"),
rpcControllerFactory.newController()) {
rpcControllerFactory.newController(), HConstants.PRIORITY_UNSET) {
@Override
protected Void rpcCall() throws Exception {
LOG.debug("compacting " + getLocation() + " for row "

View File

@ -155,6 +155,9 @@ public class SimpleRpcScheduler extends RpcScheduler implements ConfigurationObs
public boolean dispatch(CallRunner callTask) throws InterruptedException {
RpcCall call = callTask.getRpcCall();
int level = priority.getPriority(call.getHeader(), call.getParam(), call.getRequestUser());
if (level == HConstants.PRIORITY_UNSET) {
level = HConstants.NORMAL_QOS;
}
if (priorityExecutor != null && level > highPriorityLevel) {
return priorityExecutor.dispatch(callTask);
} else if (replicationExecutor != null && level == HConstants.REPLICATION_QOS) {

View File

@ -530,7 +530,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
}
return new ClientServiceCallable<byte[]>(conn,
tableName, first, rpcControllerFactory.newController()) {
tableName, first, rpcControllerFactory.newController(), HConstants.PRIORITY_UNSET) {
@Override
protected byte[] rpcCall() throws Exception {
SecureBulkLoadClient secureClient = null;

View File

@ -183,7 +183,7 @@ public class WALEditsReplaySink {
ReplayServerCallable(final Connection connection, RpcControllerFactory rpcControllerFactory,
final TableName tableName, final HRegionLocation regionLoc, final List<Entry> entries) {
super(connection, tableName, HConstants.EMPTY_BYTE_ARRAY,
rpcControllerFactory.newController());
rpcControllerFactory.newController(), HConstants.PRIORITY_UNSET);
this.entries = entries;
setLocation(regionLoc);
}

View File

@ -668,7 +668,7 @@ public class TestHCM {
TEST_UTIL.createTable(tableName, FAM_NAM);
ClientServiceCallable<Object> regionServerCallable = new ClientServiceCallable<Object>(
TEST_UTIL.getConnection(), tableName, ROW,
new RpcControllerFactory(TEST_UTIL.getConfiguration()).newController()) {
new RpcControllerFactory(TEST_UTIL.getConfiguration()).newController(), HConstants.PRIORITY_UNSET) {
@Override
protected Object rpcCall() throws Exception {
return null;

View File

@ -475,7 +475,7 @@ public class TestReplicaWithCluster {
new SecureBulkLoadClient(HTU.getConfiguration(), table).prepareBulkLoad(conn);
ClientServiceCallable<Void> callable = new ClientServiceCallable<Void>(conn,
hdt.getTableName(), TestHRegionServerBulkLoad.rowkey(0),
new RpcControllerFactory(HTU.getConfiguration()).newController()) {
new RpcControllerFactory(HTU.getConfiguration()).newController(), HConstants.PRIORITY_UNSET) {
@Override
protected Void rpcCall() throws Exception {
LOG.debug("Going to connect to server " + getLocation() + " for row "

View File

@ -471,6 +471,7 @@ public class TestHeapSize {
expected = ClassSize.estimateBase(cl, false);
//The actual TreeMap is not included in the above calculation
expected += ClassSize.align(ClassSize.TREEMAP);
expected += ClassSize.align(ClassSize.INTEGER); // priority
if (expected != actual) {
ClassSize.estimateBase(cl, true);
assertEquals(expected, actual);
@ -481,6 +482,7 @@ public class TestHeapSize {
expected = ClassSize.estimateBase(cl, false);
//The actual TreeMap is not included in the above calculation
expected += ClassSize.align(ClassSize.TREEMAP);
expected += ClassSize.align(ClassSize.INTEGER); // priority
if (expected != actual) {
ClassSize.estimateBase(cl, true);
assertEquals(expected, actual);

View File

@ -354,7 +354,7 @@ public class TestLoadIncrementalHFilesSplitRecovery {
HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER) - 1) {
ClientServiceCallable<byte[]> newServerCallable = new ClientServiceCallable<byte[]>(
conn, tableName, first, new RpcControllerFactory(
util.getConfiguration()).newController()) {
util.getConfiguration()).newController(), HConstants.PRIORITY_UNSET) {
@Override
public byte[] rpcCall() throws Exception {
throw new IOException("Error calling something on RegionServer");

View File

@ -37,6 +37,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
@ -451,7 +452,7 @@ public class TestSpaceQuotas {
Table table = conn.getTable(tn);
final String bulkToken = new SecureBulkLoadClient(conf, table).prepareBulkLoad(conn);
return new ClientServiceCallable<Void>(conn,
tn, Bytes.toBytes("row"), new RpcControllerFactory(conf).newController()) {
tn, Bytes.toBytes("row"), new RpcControllerFactory(conf).newController(), HConstants.PRIORITY_UNSET) {
@Override
public Void rpcCall() throws Exception {
SecureBulkLoadClient secureClient = null;

View File

@ -40,6 +40,7 @@ import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValueUtil;
@ -205,7 +206,7 @@ public class TestHRegionServerBulkLoad {
prepareBulkLoad(conn);
ClientServiceCallable<Void> callable = new ClientServiceCallable<Void>(conn,
tableName, Bytes.toBytes("aaa"),
new RpcControllerFactory(UTIL.getConfiguration()).newController()) {
new RpcControllerFactory(UTIL.getConfiguration()).newController(), HConstants.PRIORITY_UNSET) {
@Override
public Void rpcCall() throws Exception {
LOG.debug("Going to connect to server " + getLocation() + " for row "
@ -229,7 +230,7 @@ public class TestHRegionServerBulkLoad {
// 5 * 50 = 250 open file handles!
callable = new ClientServiceCallable<Void>(conn,
tableName, Bytes.toBytes("aaa"),
new RpcControllerFactory(UTIL.getConfiguration()).newController()) {
new RpcControllerFactory(UTIL.getConfiguration()).newController(), HConstants.PRIORITY_UNSET) {
@Override
protected Void rpcCall() throws Exception {
LOG.debug("compacting " + getLocation() + " for row "

View File

@ -27,6 +27,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.MultithreadedTestUtil.RepeatingTestThread;
import org.apache.hadoop.hbase.MultithreadedTestUtil.TestContext;
import org.apache.hadoop.hbase.TableName;
@ -94,7 +95,7 @@ public class TestHRegionServerBulkLoadWithOldClient extends TestHRegionServerBul
RpcControllerFactory rpcControllerFactory = new RpcControllerFactory(UTIL.getConfiguration());
ClientServiceCallable<Void> callable =
new ClientServiceCallable<Void>(conn, tableName,
Bytes.toBytes("aaa"), rpcControllerFactory.newController()) {
Bytes.toBytes("aaa"), rpcControllerFactory.newController(), HConstants.PRIORITY_UNSET) {
@Override
protected Void rpcCall() throws Exception {
LOG.info("Non-secure old client");
@ -114,7 +115,7 @@ public class TestHRegionServerBulkLoadWithOldClient extends TestHRegionServerBul
if (numBulkLoads.get() % 5 == 0) {
// 5 * 50 = 250 open file handles!
callable = new ClientServiceCallable<Void>(conn, tableName,
Bytes.toBytes("aaa"), rpcControllerFactory.newController()) {
Bytes.toBytes("aaa"), rpcControllerFactory.newController(), HConstants.PRIORITY_UNSET) {
@Override
protected Void rpcCall() throws Exception {
LOG.debug("compacting " + getLocation() + " for row "