diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Action.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Action.java index ef059122960..f4b696a2970 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Action.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Action.java @@ -32,10 +32,16 @@ public class Action implements Comparable { private final int originalIndex; private long nonce = HConstants.NO_NONCE; private int replicaId = RegionReplicaUtil.DEFAULT_REPLICA_ID; + private int priority; public Action(Row action, int originalIndex) { + this(action, originalIndex, HConstants.PRIORITY_UNSET); + } + + public Action(Row action, int originalIndex, int priority) { this.action = action; this.originalIndex = originalIndex; + this.priority = priority; } /** @@ -70,6 +76,8 @@ public class Action implements Comparable { return replicaId; } + public int getPriority() { return priority; } + @Override public int compareTo(Action other) { return action.compareTo(other.getAction()); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Append.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Append.java index 346eb0e4db3..02ec770de0e 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Append.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Append.java @@ -84,6 +84,7 @@ public class Append extends Mutation { for (Map.Entry entry : a.getAttributesMap().entrySet()) { this.setAttribute(entry.getKey(), entry.getValue()); } + this.setPriority(a.getPriority()); } /** Create a Append operation for the specified row. @@ -183,6 +184,11 @@ public class Append extends Mutation { return (Append) super.setACL(perms); } + @Override + public Append setPriority(int priority) { + return (Append) super.setPriority(priority); + } + @Override public Append setTTL(long ttl) { return (Append) super.setTTL(ttl); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java index 22efdaa3608..8693b3c06d7 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java @@ -291,7 +291,12 @@ class AsyncProcess { LOG.error("Failed to get region location ", ex); // This action failed before creating ars. Retain it, but do not add to submit list. // We will then add it to ars in an already-failed state. - retainedActions.add(new Action(r, ++posInList)); + + int priority = HConstants.NORMAL_QOS; + if (r instanceof Mutation) { + priority = ((Mutation) r).getPriority(); + } + retainedActions.add(new Action(r, ++posInList, priority)); locationErrors.add(ex); locationErrorRows.add(posInList); it.remove(); @@ -302,7 +307,11 @@ class AsyncProcess { break; } if (code == ReturnCode.INCLUDE) { - Action action = new Action(r, ++posInList); + int priority = HConstants.NORMAL_QOS; + if (r instanceof Mutation) { + priority = ((Mutation) r).getPriority(); + } + Action action = new Action(r, ++posInList, priority); setNonce(ng, r, action); retainedActions.add(action); // TODO: replica-get is not supported on this path @@ -372,6 +381,7 @@ class AsyncProcess { // The position will be used by the processBatch to match the object array returned. int posInList = -1; NonceGenerator ng = this.connection.getNonceGenerator(); + int highestPriority = HConstants.PRIORITY_UNSET; for (Row r : rows) { posInList++; if (r instanceof Put) { @@ -379,8 +389,9 @@ class AsyncProcess { if (put.isEmpty()) { throw new IllegalArgumentException("No columns to insert for #" + (posInList+1)+ " item"); } + highestPriority = Math.max(put.getPriority(), highestPriority); } - Action action = new Action(r, posInList); + Action action = new Action(r, posInList, highestPriority); setNonce(ng, r, action); actions.add(action); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRequestFutureImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRequestFutureImpl.java index 710ec913361..5a5a3e3e626 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRequestFutureImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRequestFutureImpl.java @@ -1267,6 +1267,6 @@ class AsyncRequestFutureImpl implements AsyncRequestFuture { private MultiServerCallable createCallable(final ServerName server, TableName tableName, final MultiAction multi) { return new MultiServerCallable(asyncProcess.connection, tableName, server, - multi, asyncProcess.rpcFactory.newController(), rpcTimeout, tracker); + multi, asyncProcess.rpcFactory.newController(), rpcTimeout, tracker, multi.getPriority()); } } \ No newline at end of file diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/CancellableRegionServerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/CancellableRegionServerCallable.java index a0ff9005f48..c0e64e33436 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/CancellableRegionServerCallable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/CancellableRegionServerCallable.java @@ -40,8 +40,8 @@ abstract class CancellableRegionServerCallable extends ClientServiceCallable< private final RetryingTimeTracker tracker; private final int rpcTimeout; CancellableRegionServerCallable(Connection connection, TableName tableName, byte[] row, - RpcController rpcController, int rpcTimeout, RetryingTimeTracker tracker) { - super(connection, tableName, row, rpcController); + RpcController rpcController, int rpcTimeout, RetryingTimeTracker tracker, int priority) { + super(connection, tableName, row, rpcController, priority); this.rpcTimeout = rpcTimeout; this.tracker = tracker; } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientServiceCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientServiceCallable.java index 5fa8de10dbc..00e955807de 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientServiceCallable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientServiceCallable.java @@ -33,9 +33,10 @@ import org.apache.hadoop.hbase.shaded.com.google.protobuf.RpcController; @InterfaceAudience.Private public abstract class ClientServiceCallable extends RegionServerCallable { + public ClientServiceCallable(Connection connection, TableName tableName, byte [] row, - RpcController rpcController) { - super(connection, tableName, row, rpcController); + RpcController rpcController, int priority) { + super(connection, tableName, row, rpcController, priority); } @Override diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Delete.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Delete.java index 0b3769d5ce2..351d8a6a812 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Delete.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Delete.java @@ -147,6 +147,7 @@ public class Delete extends Mutation implements Comparable { for (Map.Entry entry : d.getAttributesMap().entrySet()) { this.setAttribute(entry.getKey(), entry.getValue()); } + super.setPriority(d.getPriority()); } /** @@ -369,4 +370,9 @@ public class Delete extends Mutation implements Comparable { public Delete setTTL(long ttl) { throw new UnsupportedOperationException("Setting TTLs on Deletes is not supported"); } + + @Override + public Delete setPriority(int priority) { + return (Delete) super.setPriority(priority); + } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Get.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Get.java index c3ddc4b8e89..b774a9a9396 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Get.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Get.java @@ -127,6 +127,7 @@ public class Get extends Query TimeRange tr = entry.getValue(); setColumnFamilyTimeRange(entry.getKey(), tr.getMin(), tr.getMax()); } + super.setPriority(get.getPriority()); } /** @@ -552,4 +553,8 @@ public class Get extends Query return (Get) super.setIsolationLevel(level); } + @Override + public Get setPriority(int priority) { + return (Get) super.setPriority(priority); + } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java index a48b9e01b1f..c0d321bdbc5 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java @@ -415,7 +415,7 @@ public class HTable implements Table { if (get.getConsistency() == Consistency.STRONG) { final Get configuredGet = get; ClientServiceCallable callable = new ClientServiceCallable(this.connection, getName(), - get.getRow(), this.rpcControllerFactory.newController()) { + get.getRow(), this.rpcControllerFactory.newController(), get.getPriority()) { @Override protected Result rpcCall() throws Exception { ClientProtos.GetRequest request = RequestConverter.buildGetRequest( @@ -547,7 +547,7 @@ public class HTable implements Table { CancellableRegionServerCallable callable = new CancellableRegionServerCallable( connection, getName(), delete.getRow(), this.rpcControllerFactory.newController(), - writeRpcTimeout, new RetryingTimeTracker().start()) { + writeRpcTimeout, new RetryingTimeTracker().start(), delete.getPriority()) { @Override protected SingleResponse rpcCall() throws Exception { MutateRequest request = RequestConverter.buildMutateRequest( @@ -624,7 +624,7 @@ public class HTable implements Table { public void mutateRow(final RowMutations rm) throws IOException { CancellableRegionServerCallable callable = new CancellableRegionServerCallable(this.connection, getName(), rm.getRow(), - rpcControllerFactory.newController(), writeRpcTimeout, new RetryingTimeTracker().start()){ + rpcControllerFactory.newController(), writeRpcTimeout, new RetryingTimeTracker().start(), rm.getMaxPriority()){ @Override protected MultiResponse rpcCall() throws Exception { RegionAction.Builder regionMutationBuilder = RequestConverter.buildRegionAction( @@ -668,7 +668,7 @@ public class HTable implements Table { checkHasFamilies(append); NoncedRegionServerCallable callable = new NoncedRegionServerCallable(this.connection, getName(), append.getRow(), - this.rpcControllerFactory.newController()) { + this.rpcControllerFactory.newController(), append.getPriority()) { @Override protected Result rpcCall() throws Exception { MutateRequest request = RequestConverter.buildMutateRequest( @@ -690,7 +690,7 @@ public class HTable implements Table { checkHasFamilies(increment); NoncedRegionServerCallable callable = new NoncedRegionServerCallable(this.connection, getName(), increment.getRow(), - this.rpcControllerFactory.newController()) { + this.rpcControllerFactory.newController(), increment.getPriority()) { @Override protected Result rpcCall() throws Exception { MutateRequest request = RequestConverter.buildMutateRequest( @@ -734,7 +734,7 @@ public class HTable implements Table { NoncedRegionServerCallable callable = new NoncedRegionServerCallable(this.connection, getName(), row, - this.rpcControllerFactory.newController()) { + this.rpcControllerFactory.newController(), HConstants.PRIORITY_UNSET) { @Override protected Long rpcCall() throws Exception { MutateRequest request = RequestConverter.buildIncrementRequest( @@ -758,7 +758,7 @@ public class HTable implements Table { final Put put) throws IOException { ClientServiceCallable callable = new ClientServiceCallable(this.connection, getName(), row, - this.rpcControllerFactory.newController()) { + this.rpcControllerFactory.newController(), put.getPriority()) { @Override protected Boolean rpcCall() throws Exception { MutateRequest request = RequestConverter.buildMutateRequest( @@ -782,7 +782,7 @@ public class HTable implements Table { throws IOException { ClientServiceCallable callable = new ClientServiceCallable(this.connection, getName(), row, - this.rpcControllerFactory.newController()) { + this.rpcControllerFactory.newController(), put.getPriority()) { @Override protected Boolean rpcCall() throws Exception { CompareType compareType = CompareType.valueOf(compareOp.name()); @@ -817,7 +817,7 @@ public class HTable implements Table { CancellableRegionServerCallable callable = new CancellableRegionServerCallable( this.connection, getName(), row, this.rpcControllerFactory.newController(), - writeRpcTimeout, new RetryingTimeTracker().start()) { + writeRpcTimeout, new RetryingTimeTracker().start(), delete.getPriority()) { @Override protected SingleResponse rpcCall() throws Exception { CompareType compareType = CompareType.valueOf(compareOp.name()); @@ -858,7 +858,7 @@ public class HTable implements Table { throws IOException { CancellableRegionServerCallable callable = new CancellableRegionServerCallable(connection, getName(), rm.getRow(), - rpcControllerFactory.newController(), writeRpcTimeout, new RetryingTimeTracker().start()) { + rpcControllerFactory.newController(), writeRpcTimeout, new RetryingTimeTracker().start(), rm.getMaxPriority()) { @Override protected MultiResponse rpcCall() throws Exception { CompareType compareType = CompareType.valueOf(compareOp.name()); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Increment.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Increment.java index 4ba0efa7950..d32355513f8 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Increment.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Increment.java @@ -84,6 +84,7 @@ public class Increment extends Mutation implements Comparable { for (Map.Entry entry : i.getAttributesMap().entrySet()) { this.setAttribute(entry.getKey(), entry.getValue()); } + super.setPriority(i.getPriority()); } /** @@ -331,4 +332,9 @@ public class Increment extends Mutation implements Comparable { public Increment setTTL(long ttl) { return (Increment) super.setTTL(ttl); } + + @Override + public Increment setPriority(int priority) { + return (Increment) super.setPriority(priority); + } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiAction.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiAction.java index a4aa71d8208..bcec395371b 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiAction.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiAction.java @@ -20,11 +20,16 @@ package org.apache.hadoop.hbase.client; import java.util.ArrayList; import java.util.Collections; +import java.util.Comparator; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.TreeMap; +import java.util.function.Consumer; +import java.util.function.Predicate; +import com.google.common.collect.Iterables; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.util.Bytes; @@ -103,4 +108,11 @@ public final class MultiAction { public long getNonceGroup() { return this.nonceGroup; } + + // returns the max priority of all the actions + public int getPriority() { + Optional result = actions.values().stream().flatMap(List::stream) + .max((action1, action2) -> Math.max(action1.getPriority(), action2.getPriority())); + return result.isPresent() ? result.get().getPriority() : HConstants.PRIORITY_UNSET; + } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java index 64dada0bd6d..33c9a0bb2dd 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java @@ -55,8 +55,8 @@ class MultiServerCallable extends CancellableRegionServerCallable MultiServerCallable(final ClusterConnection connection, final TableName tableName, final ServerName location, final MultiAction multi, RpcController rpcController, - int rpcTimeout, RetryingTimeTracker tracker) { - super(connection, tableName, null, rpcController, rpcTimeout, tracker); + int rpcTimeout, RetryingTimeTracker tracker, int priority) { + super(connection, tableName, null, rpcController, rpcTimeout, tracker, priority); this.multiAction = multi; // RegionServerCallable has HRegionLocation field, but this is a multi-region request. // Using region info from parent HRegionLocation would be a mistake for this class; so diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Mutation.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Mutation.java index f6cb4b1b2ac..3b6049729a0 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Mutation.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Mutation.java @@ -71,7 +71,10 @@ public abstract class Mutation extends OperationWithAttributes implements Row, C // familyMap ClassSize.REFERENCE + // familyMap - ClassSize.TREEMAP); + ClassSize.TREEMAP + + // priority + ClassSize.INTEGER + ); /** * The attribute for storing the list of clusters that have consumed the change. diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/NoncedRegionServerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/NoncedRegionServerCallable.java index 52ed26374ea..5dc19f6fe93 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/NoncedRegionServerCallable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/NoncedRegionServerCallable.java @@ -47,8 +47,8 @@ public abstract class NoncedRegionServerCallable extends ClientServiceCallabl * @param row The row we want in tableName. */ public NoncedRegionServerCallable(Connection connection, TableName tableName, byte [] row, - HBaseRpcController rpcController) { - super(connection, tableName, row, rpcController); + HBaseRpcController rpcController, int priority) { + super(connection, tableName, row, rpcController, priority); this.nonce = getConnection().getNonceGenerator().newNonce(); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/OperationWithAttributes.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/OperationWithAttributes.java index ba21cbbdb6f..1fb691a6c25 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/OperationWithAttributes.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/OperationWithAttributes.java @@ -23,6 +23,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.Map; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.ClassSize; @@ -34,6 +35,7 @@ public abstract class OperationWithAttributes extends Operation implements Attri // used for uniquely identifying an operation public static final String ID_ATRIBUTE = "_operation.attributes.id"; + private int priority = HConstants.PRIORITY_UNSET; @Override public OperationWithAttributes setAttribute(String name, byte[] value) { @@ -108,4 +110,14 @@ public abstract class OperationWithAttributes extends Operation implements Attri byte[] attr = getAttribute(ID_ATRIBUTE); return attr == null? null: Bytes.toString(attr); } + + public OperationWithAttributes setPriority(int priority) { + this.priority = priority; + return this; + } + + public int getPriority() { + return priority; + } + } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionCoprocessorRpcChannel.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionCoprocessorRpcChannel.java index 3b10549923f..df7d74f8c24 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionCoprocessorRpcChannel.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionCoprocessorRpcChannel.java @@ -21,6 +21,7 @@ import java.io.IOException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils; @@ -77,7 +78,7 @@ class RegionCoprocessorRpcChannel extends SyncCoprocessorRpcChannel { } ClientServiceCallable callable = new ClientServiceCallable(this.conn, - this.table, this.row, this.conn.getRpcControllerFactory().newController()) { + this.table, this.row, this.conn.getRpcControllerFactory().newController(), HConstants.PRIORITY_UNSET) { @Override protected CoprocessorServiceResponse rpcCall() throws Exception { byte [] regionName = getLocation().getRegionInfo().getRegionName(); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionServerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionServerCallable.java index fb593a3def5..499685d5d42 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionServerCallable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionServerCallable.java @@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.client; import java.io.IOException; import org.apache.hadoop.hbase.CellScanner; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.ServerName; @@ -66,6 +67,7 @@ public abstract class RegionServerCallable implements RetryingCallable * Can be null! */ protected final RpcController rpcController; + private int priority = HConstants.NORMAL_QOS; /** * @param connection Connection to use. @@ -75,11 +77,17 @@ public abstract class RegionServerCallable implements RetryingCallable */ public RegionServerCallable(Connection connection, TableName tableName, byte [] row, RpcController rpcController) { + this(connection, tableName, row, rpcController, HConstants.NORMAL_QOS); + } + + public RegionServerCallable(Connection connection, TableName tableName, byte [] row, + RpcController rpcController, int priority) { super(); this.connection = connection; this.tableName = tableName; this.row = row; this.rpcController = rpcController; + this.priority = priority; } protected RpcController getRpcController() { @@ -111,6 +119,7 @@ public abstract class RegionServerCallable implements RetryingCallable // If it is an instance of HBaseRpcController, we can set priority on the controller based // off the tableName. Set call timeout too. hrc.setPriority(tableName); + hrc.setPriority(priority); hrc.setCallTimeout(callTimeout); } } @@ -172,6 +181,8 @@ public abstract class RegionServerCallable implements RetryingCallable return this.row; } + protected int getPriority() { return this.priority;} + public void throwable(Throwable t, boolean retrying) { if (location != null) { getConnection().updateCachedLocations(tableName, location.getRegionInfo().getRegionName(), diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RowMutations.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RowMutations.java index a9384aca910..a6d6d39c683 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RowMutations.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RowMutations.java @@ -118,4 +118,12 @@ public class RowMutations implements Row { public List getMutations() { return Collections.unmodifiableList(mutations); } + + public int getMaxPriority() { + int maxPriority = Integer.MIN_VALUE; + for (Mutation mutation : mutations) { + maxPriority = Math.max(maxPriority, mutation.getPriority()); + } + return maxPriority; + } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java index b5cddde2be5..3cd9b2f48e3 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java @@ -27,7 +27,6 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -45,6 +44,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import static org.apache.hadoop.hbase.HConstants.PRIORITY_UNSET; /** * Caller that goes to replica if the primary region does no answer within a configurable @@ -96,7 +96,7 @@ public class RpcRetryingCallerWithReadReplicas { public ReplicaRegionServerCallable(int id, HRegionLocation location) { super(RpcRetryingCallerWithReadReplicas.this.cConnection, RpcRetryingCallerWithReadReplicas.this.tableName, get.getRow(), - rpcControllerFactory.newController(), rpcTimeout, new RetryingTimeTracker()); + rpcControllerFactory.newController(), rpcTimeout, new RetryingTimeTracker(), PRIORITY_UNSET); this.id = id; this.location = location; } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java index 639f43e4e87..e84716f5103 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java @@ -276,6 +276,7 @@ public class Scan extends Query { this.mvccReadPoint = scan.getMvccReadPoint(); this.limit = scan.getLimit(); this.needCursorResult = scan.isNeedCursorResult(); + setPriority(scan.getPriority()); } /** @@ -306,6 +307,7 @@ public class Scan extends Query { setColumnFamilyTimeRange(entry.getKey(), tr.getMin(), tr.getMax()); } this.mvccReadPoint = -1L; + setPriority(get.getPriority()); } public boolean isGetScan() { @@ -1060,6 +1062,11 @@ public class Scan extends Query { return (Scan) super.setIsolationLevel(level); } + @Override + public Scan setPriority(int priority) { + return (Scan) super.setPriority(priority); + } + /** * Enable collection of {@link ScanMetrics}. For advanced users. * @param enabled Set to true to enable accumulating scan metrics diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java index 4227e41eacb..bb8b185d733 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java @@ -117,7 +117,7 @@ public class ScannerCallable extends ClientServiceCallable { */ public ScannerCallable(ClusterConnection connection, TableName tableName, Scan scan, ScanMetrics scanMetrics, RpcControllerFactory rpcControllerFactory, int id) { - super(connection, tableName, scan.getStartRow(), rpcControllerFactory.newController()); + super(connection, tableName, scan.getStartRow(), rpcControllerFactory.newController(), scan.getPriority()); this.id = id; this.scan = scan; this.scanMetrics = scanMetrics; diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SecureBulkLoadClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SecureBulkLoadClient.java index c8d973846d5..aa9f645fa19 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SecureBulkLoadClient.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SecureBulkLoadClient.java @@ -22,7 +22,6 @@ import java.io.IOException; import java.util.List; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.ipc.RpcControllerFactory; @@ -39,6 +38,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpeci import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType; import org.apache.hadoop.security.token.Token; +import static org.apache.hadoop.hbase.HConstants.PRIORITY_UNSET; + /** * Client proxy for SecureBulkLoadProtocol */ @@ -56,7 +57,7 @@ public class SecureBulkLoadClient { try { ClientServiceCallable callable = new ClientServiceCallable(conn, table.getName(), HConstants.EMPTY_START_ROW, - this.rpcControllerFactory.newController()) { + this.rpcControllerFactory.newController(), PRIORITY_UNSET) { @Override protected String rpcCall() throws Exception { byte[] regionName = getLocation().getRegionInfo().getRegionName(); @@ -79,7 +80,7 @@ public class SecureBulkLoadClient { public void cleanupBulkLoad(final Connection conn, final String bulkToken) throws IOException { try { ClientServiceCallable callable = new ClientServiceCallable(conn, - table.getName(), HConstants.EMPTY_START_ROW, this.rpcControllerFactory.newController()) { + table.getName(), HConstants.EMPTY_START_ROW, this.rpcControllerFactory.newController(), PRIORITY_UNSET) { @Override protected Void rpcCall() throws Exception { byte[] regionName = getLocation().getRegionInfo().getRegionName(); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcController.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcController.java index 71ce70a14f4..b925330c800 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcController.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcController.java @@ -37,8 +37,6 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience; @InterfaceAudience.Private public interface HBaseRpcController extends RpcController, CellScannable { - static final int PRIORITY_UNSET = -1; - /** * Only used to send cells to rpc server, the returned cells should be set by * {@link #setDone(CellScanner)}. diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcControllerImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcControllerImpl.java index 8ceac64f7ca..64d91f33b2d 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcControllerImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcControllerImpl.java @@ -56,7 +56,7 @@ public class HBaseRpcControllerImpl implements HBaseRpcController { * This is the ordained way of setting priorities going forward. We will be undoing the old * annotation-based mechanism. */ - private int priority = PRIORITY_UNSET; + private int priority = HConstants.PRIORITY_UNSET; /** * They are optionally set on construction, cleared after we make the call, and then optionally @@ -95,7 +95,8 @@ public class HBaseRpcControllerImpl implements HBaseRpcController { @Override public void setPriority(int priority) { - this.priority = priority; + this.priority = Math.max(this.priority, priority); + } @Override @@ -106,7 +107,7 @@ public class HBaseRpcControllerImpl implements HBaseRpcController { @Override public int getPriority() { - return priority; + return priority < 0 ? HConstants.NORMAL_QOS : priority; } @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "IS2_INCONSISTENT_SYNC", diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java index 6dab3b59062..e0636eb55d1 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java @@ -29,6 +29,7 @@ import java.net.SocketTimeoutException; import java.nio.ByteBuffer; import org.apache.hadoop.hbase.DoNotRetryIOException; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.exceptions.ConnectionClosingException; import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.CellBlockMeta; @@ -111,7 +112,7 @@ class IPCUtil { builder.setCellBlockMeta(cellBlockMeta); } // Only pass priority if there is one set. - if (call.priority != HBaseRpcController.PRIORITY_UNSET) { + if (call.priority != HConstants.PRIORITY_UNSET) { builder.setPriority(call.priority); } builder.setTimeout(call.timeout); diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java index dfc140b71ee..54e0eb8c235 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java @@ -1113,6 +1113,7 @@ public final class HConstants { * handled by high priority handlers. */ // normal_QOS < replication_QOS < replay_QOS < QOS_threshold < admin_QOS < high_QOS + public static final int PRIORITY_UNSET = -1; public static final int NORMAL_QOS = 0; public static final int REPLICATION_QOS = 5; public static final int REPLAY_QOS = 6; diff --git a/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/client/TestRpcControllerFactory.java b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/client/TestRpcControllerFactory.java index a7709ee1041..848934c4a5c 100644 --- a/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/client/TestRpcControllerFactory.java +++ b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/client/TestRpcControllerFactory.java @@ -28,6 +28,8 @@ import java.io.IOException; import java.util.List; import java.util.concurrent.atomic.AtomicInteger; +import org.apache.curator.shaded.com.google.common.collect.ConcurrentHashMultiset; +import org.apache.curator.shaded.com.google.common.collect.Multiset; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.CellScannable; import org.apache.hadoop.hbase.CellScanner; @@ -76,6 +78,7 @@ public class TestRpcControllerFactory { public static class CountingRpcController extends DelegatingHBaseRpcController { + private static Multiset GROUPED_PRIORITY = ConcurrentHashMultiset.create(); private static AtomicInteger INT_PRIORITY = new AtomicInteger(); private static AtomicInteger TABLE_PRIORITY = new AtomicInteger(); @@ -85,8 +88,13 @@ public class TestRpcControllerFactory { @Override public void setPriority(int priority) { + int oldPriority = getPriority(); super.setPriority(priority); - INT_PRIORITY.incrementAndGet(); + int newPriority = getPriority(); + if (newPriority != oldPriority) { + INT_PRIORITY.incrementAndGet(); + GROUPED_PRIORITY.add(priority); + } } @Override @@ -196,6 +204,14 @@ public class TestRpcControllerFactory { scanInfo.setSmall(false); counter = doScan(table, scanInfo, counter + 1); + // make sure we have no priority count + verifyPriorityGroupCount(HConstants.ADMIN_QOS, 0); + // lets set a custom priority on a get + Get get = new Get(row); + get.setPriority(HConstants.ADMIN_QOS); + table.get(get); + verifyPriorityGroupCount(HConstants.ADMIN_QOS, 1); + table.close(); connection.close(); } @@ -208,11 +224,15 @@ public class TestRpcControllerFactory { } int verifyCount(Integer counter) { - assertTrue(CountingRpcController.TABLE_PRIORITY.get() >= counter.intValue()); + assertTrue(CountingRpcController.TABLE_PRIORITY.get() >= counter); assertEquals(0, CountingRpcController.INT_PRIORITY.get()); return CountingRpcController.TABLE_PRIORITY.get() + 1; } + void verifyPriorityGroupCount(int priorityLevel, int count) { + assertEquals(count, CountingRpcController.GROUPED_PRIORITY.count(priorityLevel)); + } + @Test public void testFallbackToDefaultRpcControllerFactory() { Configuration conf = new Configuration(UTIL.getConfiguration()); diff --git a/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoadWithOldSecureEndpoint.java b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoadWithOldSecureEndpoint.java index 2c38662e680..0d5c993eeea 100644 --- a/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoadWithOldSecureEndpoint.java +++ b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoadWithOldSecureEndpoint.java @@ -27,6 +27,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.MultithreadedTestUtil.RepeatingTestThread; import org.apache.hadoop.hbase.MultithreadedTestUtil.TestContext; import org.apache.hadoop.hbase.TableName; @@ -108,7 +109,7 @@ public class TestHRegionServerBulkLoadWithOldSecureEndpoint extends TestHRegionS RpcControllerFactory rpcControllerFactory = new RpcControllerFactory(UTIL.getConfiguration()); ClientServiceCallable callable = new ClientServiceCallable(conn, tableName, Bytes.toBytes("aaa"), - rpcControllerFactory.newController()) { + rpcControllerFactory.newController(), HConstants.PRIORITY_UNSET) { @Override protected Void rpcCall() throws Exception { LOG.debug("Going to connect to server " + getLocation() + " for row " + @@ -128,7 +129,7 @@ public class TestHRegionServerBulkLoadWithOldSecureEndpoint extends TestHRegionS if (numBulkLoads.get() % 5 == 0) { // 5 * 50 = 250 open file handles! callable = new ClientServiceCallable(conn, tableName, Bytes.toBytes("aaa"), - rpcControllerFactory.newController()) { + rpcControllerFactory.newController(), HConstants.PRIORITY_UNSET) { @Override protected Void rpcCall() throws Exception { LOG.debug("compacting " + getLocation() + " for row " diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java index 2ee2d7ed660..900861bb607 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java @@ -155,6 +155,9 @@ public class SimpleRpcScheduler extends RpcScheduler implements ConfigurationObs public boolean dispatch(CallRunner callTask) throws InterruptedException { RpcCall call = callTask.getRpcCall(); int level = priority.getPriority(call.getHeader(), call.getParam(), call.getRequestUser()); + if (level == HConstants.PRIORITY_UNSET) { + level = HConstants.NORMAL_QOS; + } if (priorityExecutor != null && level > highPriorityLevel) { return priorityExecutor.dispatch(callTask); } else if (replicationExecutor != null && level == HConstants.REPLICATION_QOS) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java index 4191aa869b9..7b4a353533a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java @@ -530,7 +530,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool { } return new ClientServiceCallable(conn, - tableName, first, rpcControllerFactory.newController()) { + tableName, first, rpcControllerFactory.newController(), HConstants.PRIORITY_UNSET) { @Override protected byte[] rpcCall() throws Exception { SecureBulkLoadClient secureClient = null; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEditsReplaySink.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEditsReplaySink.java index f4512076164..c616a01bcd5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEditsReplaySink.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEditsReplaySink.java @@ -183,7 +183,7 @@ public class WALEditsReplaySink { ReplayServerCallable(final Connection connection, RpcControllerFactory rpcControllerFactory, final TableName tableName, final HRegionLocation regionLoc, final List entries) { super(connection, tableName, HConstants.EMPTY_BYTE_ARRAY, - rpcControllerFactory.newController()); + rpcControllerFactory.newController(), HConstants.PRIORITY_UNSET); this.entries = entries; setLocation(regionLoc); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java index 037a538060f..1ef6c6088bb 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java @@ -668,7 +668,7 @@ public class TestHCM { TEST_UTIL.createTable(tableName, FAM_NAM); ClientServiceCallable regionServerCallable = new ClientServiceCallable( TEST_UTIL.getConnection(), tableName, ROW, - new RpcControllerFactory(TEST_UTIL.getConfiguration()).newController()) { + new RpcControllerFactory(TEST_UTIL.getConfiguration()).newController(), HConstants.PRIORITY_UNSET) { @Override protected Object rpcCall() throws Exception { return null; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicaWithCluster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicaWithCluster.java index 437afaf9639..898f629437f 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicaWithCluster.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicaWithCluster.java @@ -475,7 +475,7 @@ public class TestReplicaWithCluster { new SecureBulkLoadClient(HTU.getConfiguration(), table).prepareBulkLoad(conn); ClientServiceCallable callable = new ClientServiceCallable(conn, hdt.getTableName(), TestHRegionServerBulkLoad.rowkey(0), - new RpcControllerFactory(HTU.getConfiguration()).newController()) { + new RpcControllerFactory(HTU.getConfiguration()).newController(), HConstants.PRIORITY_UNSET) { @Override protected Void rpcCall() throws Exception { LOG.debug("Going to connect to server " + getLocation() + " for row " diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java index bf74a9eb946..8a6e19b4dbb 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java @@ -471,6 +471,7 @@ public class TestHeapSize { expected = ClassSize.estimateBase(cl, false); //The actual TreeMap is not included in the above calculation expected += ClassSize.align(ClassSize.TREEMAP); + expected += ClassSize.align(ClassSize.INTEGER); // priority if (expected != actual) { ClassSize.estimateBase(cl, true); assertEquals(expected, actual); @@ -481,6 +482,7 @@ public class TestHeapSize { expected = ClassSize.estimateBase(cl, false); //The actual TreeMap is not included in the above calculation expected += ClassSize.align(ClassSize.TREEMAP); + expected += ClassSize.align(ClassSize.INTEGER); // priority if (expected != actual) { ClassSize.estimateBase(cl, true); assertEquals(expected, actual); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFilesSplitRecovery.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFilesSplitRecovery.java index 32ebbd2a19d..e1aa1371171 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFilesSplitRecovery.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFilesSplitRecovery.java @@ -354,7 +354,7 @@ public class TestLoadIncrementalHFilesSplitRecovery { HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER) - 1) { ClientServiceCallable newServerCallable = new ClientServiceCallable( conn, tableName, first, new RpcControllerFactory( - util.getConfiguration()).newController()) { + util.getConfiguration()).newController(), HConstants.PRIORITY_UNSET) { @Override public byte[] rpcCall() throws Exception { throw new IOException("Error calling something on RegionServer"); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestSpaceQuotas.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestSpaceQuotas.java index 23a55e2cb72..e52b139daa8 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestSpaceQuotas.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestSpaceQuotas.java @@ -37,6 +37,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Admin; @@ -451,7 +452,7 @@ public class TestSpaceQuotas { Table table = conn.getTable(tn); final String bulkToken = new SecureBulkLoadClient(conf, table).prepareBulkLoad(conn); return new ClientServiceCallable(conn, - tn, Bytes.toBytes("row"), new RpcControllerFactory(conf).newController()) { + tn, Bytes.toBytes("row"), new RpcControllerFactory(conf).newController(), HConstants.PRIORITY_UNSET) { @Override public Void rpcCall() throws Exception { SecureBulkLoadClient secureClient = null; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoad.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoad.java index c17234ee90f..b5304f43369 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoad.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoad.java @@ -40,6 +40,7 @@ import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValueUtil; @@ -205,7 +206,7 @@ public class TestHRegionServerBulkLoad { prepareBulkLoad(conn); ClientServiceCallable callable = new ClientServiceCallable(conn, tableName, Bytes.toBytes("aaa"), - new RpcControllerFactory(UTIL.getConfiguration()).newController()) { + new RpcControllerFactory(UTIL.getConfiguration()).newController(), HConstants.PRIORITY_UNSET) { @Override public Void rpcCall() throws Exception { LOG.debug("Going to connect to server " + getLocation() + " for row " @@ -229,7 +230,7 @@ public class TestHRegionServerBulkLoad { // 5 * 50 = 250 open file handles! callable = new ClientServiceCallable(conn, tableName, Bytes.toBytes("aaa"), - new RpcControllerFactory(UTIL.getConfiguration()).newController()) { + new RpcControllerFactory(UTIL.getConfiguration()).newController(), HConstants.PRIORITY_UNSET) { @Override protected Void rpcCall() throws Exception { LOG.debug("compacting " + getLocation() + " for row " diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoadWithOldClient.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoadWithOldClient.java index 2a1655db0b5..7f486e4e574 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoadWithOldClient.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoadWithOldClient.java @@ -27,6 +27,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.MultithreadedTestUtil.RepeatingTestThread; import org.apache.hadoop.hbase.MultithreadedTestUtil.TestContext; import org.apache.hadoop.hbase.TableName; @@ -94,7 +95,7 @@ public class TestHRegionServerBulkLoadWithOldClient extends TestHRegionServerBul RpcControllerFactory rpcControllerFactory = new RpcControllerFactory(UTIL.getConfiguration()); ClientServiceCallable callable = new ClientServiceCallable(conn, tableName, - Bytes.toBytes("aaa"), rpcControllerFactory.newController()) { + Bytes.toBytes("aaa"), rpcControllerFactory.newController(), HConstants.PRIORITY_UNSET) { @Override protected Void rpcCall() throws Exception { LOG.info("Non-secure old client"); @@ -114,7 +115,7 @@ public class TestHRegionServerBulkLoadWithOldClient extends TestHRegionServerBul if (numBulkLoads.get() % 5 == 0) { // 5 * 50 = 250 open file handles! callable = new ClientServiceCallable(conn, tableName, - Bytes.toBytes("aaa"), rpcControllerFactory.newController()) { + Bytes.toBytes("aaa"), rpcControllerFactory.newController(), HConstants.PRIORITY_UNSET) { @Override protected Void rpcCall() throws Exception { LOG.debug("compacting " + getLocation() + " for row "