From 26247996d25dad38678fed2e2a1b8f0d383df082 Mon Sep 17 00:00:00 2001 From: rgidwani Date: Fri, 21 Jul 2017 12:20:24 -0700 Subject: [PATCH] HBASE-15816 Provide client with ability to set priority on Operations Signed-off-by: Andrew Purtell --- .../apache/hadoop/hbase/client/Action.java | 8 ++++++ .../apache/hadoop/hbase/client/Append.java | 6 +++++ .../hadoop/hbase/client/AsyncProcess.java | 18 ++++++++++--- .../apache/hadoop/hbase/client/Delete.java | 7 +++++ .../org/apache/hadoop/hbase/client/Get.java | 6 +++++ .../apache/hadoop/hbase/client/HTable.java | 27 ++++++++++++------- .../apache/hadoop/hbase/client/Increment.java | 6 +++++ .../hadoop/hbase/client/MultiAction.java | 10 +++++++ .../hbase/client/MultiServerCallable.java | 5 ++-- .../apache/hadoop/hbase/client/Mutation.java | 5 +++- .../hbase/client/OperationWithAttributes.java | 11 ++++++++ .../client/PayloadCarryingServerCallable.java | 10 +++++-- .../hbase/client/RegionServerCallable.java | 11 ++++++++ .../hadoop/hbase/client/RowMutations.java | 8 ++++++ .../RpcRetryingCallerWithReadReplicas.java | 3 ++- .../org/apache/hadoop/hbase/client/Scan.java | 7 +++++ .../hadoop/hbase/client/ScannerCallable.java | 2 +- .../hadoop/hbase/ipc/HBaseRpcController.java | 2 -- .../hbase/ipc/HBaseRpcControllerImpl.java | 6 ++--- .../org/apache/hadoop/hbase/ipc/IPCUtil.java | 3 ++- .../ipc/RegionCoprocessorRpcChannel.java | 3 ++- .../org/apache/hadoop/hbase/HConstants.java | 1 + .../client/TestRpcControllerFactory.java | 27 ++++++++++++++++--- .../apache/hadoop/hbase/io/TestHeapSize.java | 2 ++ 24 files changed, 164 insertions(+), 30 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Action.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Action.java index 2bc5d79dc9b..5417b6bee88 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Action.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Action.java @@ -34,11 +34,17 @@ public class Action implements Comparable { private int originalIndex; private long nonce = HConstants.NO_NONCE; private int replicaId = RegionReplicaUtil.DEFAULT_REPLICA_ID; + private int priority; public Action(Row action, int originalIndex) { + this(action, originalIndex, HConstants.PRIORITY_UNSET); + } + + public Action(Row action, int originalIndex, int priority) { super(); this.action = action; this.originalIndex = originalIndex; + this.priority = priority; } /** @@ -75,6 +81,8 @@ public class Action implements Comparable { return replicaId; } + public int getPriority() { return priority; } + @SuppressWarnings("rawtypes") @Override public int compareTo(Object o) { diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Append.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Append.java index f20f7274ca2..ec4ea37d9bd 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Append.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Append.java @@ -86,6 +86,7 @@ public class Append extends Mutation { for (Map.Entry entry : a.getAttributesMap().entrySet()) { this.setAttribute(entry.getKey(), entry.getValue()); } + this.setPriority(a.getPriority()); } /** Create a Append operation for the specified row. @@ -183,6 +184,11 @@ public class Append extends Mutation { return (Append) super.setACL(perms); } + @Override + public Append setPriority(int priority) { + return (Append) super.setPriority(priority); + } + @Override public Append setTTL(long ttl) { return (Append) super.setTTL(ttl); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java index 73cafc11236..10d4f38a3bb 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java @@ -504,7 +504,11 @@ class AsyncProcess { LOG.error("Failed to get region location ", ex); // This action failed before creating ars. Retain it, but do not add to submit list. // We will then add it to ars in an already-failed state. - retainedActions.add(new Action(r, ++posInList)); + int priority = HConstants.NORMAL_QOS; + if (r instanceof Mutation) { + priority = ((Mutation) r).getPriority(); + } + retainedActions.add(new Action(r, ++posInList, priority)); locationErrors.add(ex); locationErrorRows.add(posInList); it.remove(); @@ -516,7 +520,11 @@ class AsyncProcess { break; } if (code == ReturnCode.INCLUDE) { - Action action = new Action(r, ++posInList); + int priority = HConstants.NORMAL_QOS; + if (r instanceof Mutation) { + priority = ((Mutation) r).getPriority(); + } + Action action = new Action(r, ++posInList, priority); setNonce(ng, r, action); retainedActions.add(action); // TODO: replica-get is not supported on this path @@ -619,6 +627,7 @@ class AsyncProcess { // The position will be used by the processBatch to match the object array returned. int posInList = -1; NonceGenerator ng = this.connection.getNonceGenerator(); + int highestPriority = HConstants.PRIORITY_UNSET; for (Row r : rows) { posInList++; if (r instanceof Put) { @@ -626,8 +635,9 @@ class AsyncProcess { if (put.isEmpty()) { throw new IllegalArgumentException("No columns to insert for #" + (posInList+1)+ " item"); } + highestPriority = Math.max(put.getPriority(), highestPriority); } - Action action = new Action(r, posInList); + Action action = new Action(r, posInList, highestPriority); setNonce(ng, r, action); actions.add(action); } @@ -1782,7 +1792,7 @@ class AsyncProcess { protected MultiServerCallable createCallable(final ServerName server, TableName tableName, final MultiAction multi) { return new MultiServerCallable(connection, tableName, server, - AsyncProcess.this.rpcFactory, multi, rpcTimeout, tracker); + AsyncProcess.this.rpcFactory, multi, rpcTimeout, tracker, multi.getPriority()); } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Delete.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Delete.java index bdacf932ce3..4e1fe09ccb0 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Delete.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Delete.java @@ -147,6 +147,7 @@ public class Delete extends Mutation implements Comparable { for (Map.Entry entry : d.getAttributesMap().entrySet()) { this.setAttribute(entry.getKey(), entry.getValue()); } + super.setPriority(d.getPriority()); } /** @@ -478,4 +479,10 @@ public class Delete extends Mutation implements Comparable { public Delete setTTL(long ttl) { throw new UnsupportedOperationException("Setting TTLs on Deletes is not supported"); } + + @Override + public Delete setPriority(int priority) { + return (Delete) super.setPriority(priority); + } + } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Get.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Get.java index 88da0b02287..2a1e9f23d1b 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Get.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Get.java @@ -130,6 +130,7 @@ public class Get extends Query TimeRange tr = entry.getValue(); setColumnFamilyTimeRange(entry.getKey(), tr.getMin(), tr.getMax()); } + super.setPriority(get.getPriority()); } public boolean isCheckExistenceOnly() { @@ -511,4 +512,9 @@ public class Get extends Query return (Get) super.setIsolationLevel(level); } + @Override + public Get setPriority(int priority) { + return (Get) super.setPriority(priority); + } + } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java index d4fa2e32ea2..e9531f32a4f 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java @@ -846,13 +846,14 @@ public class HTable implements HTableInterface, RegionLocator { // Good old call. final Get getReq = get; RegionServerCallable callable = new RegionServerCallable(this.connection, - getName(), get.getRow()) { + getName(), get.getRow(), get.getPriority()) { @Override public Result call(int callTimeout) throws IOException { ClientProtos.GetRequest request = RequestConverter.buildGetRequest(getLocation().getRegionInfo().getRegionName(), getReq); HBaseRpcController controller = rpcControllerFactory.newController(); controller.setPriority(tableName); + controller.setPriority(getPriority()); controller.setCallTimeout(callTimeout); try { ClientProtos.GetResponse response = getStub().get(controller, request); @@ -973,11 +974,12 @@ public class HTable implements HTableInterface, RegionLocator { public void delete(final Delete delete) throws IOException { RegionServerCallable callable = new RegionServerCallable(connection, - tableName, delete.getRow()) { + tableName, delete.getRow(), delete.getPriority()) { @Override public Boolean call(int callTimeout) throws IOException { HBaseRpcController controller = rpcControllerFactory.newController(); controller.setPriority(tableName); + controller.setPriority(getPriority()); controller.setCallTimeout(callTimeout); try { @@ -1055,6 +1057,7 @@ public class HTable implements HTableInterface, RegionLocator { public MultiResponse call(int callTimeout) throws IOException { controller.reset(); controller.setPriority(tableName); + controller.setPriority(rm.getMaxPriority()); int remainingTime = tracker.getRemainingTime(callTimeout); if (remainingTime == 0) { throw new DoNotRetryIOException("Timeout for mutate row"); @@ -1103,12 +1106,12 @@ public class HTable implements HTableInterface, RegionLocator { NonceGenerator ng = this.connection.getNonceGenerator(); final long nonceGroup = ng.getNonceGroup(), nonce = ng.newNonce(); RegionServerCallable callable = - new RegionServerCallable(this.connection, getName(), append.getRow()) { + new RegionServerCallable(this.connection, getName(), append.getRow(), append.getPriority()) { @Override public Result call(int callTimeout) throws IOException { HBaseRpcController controller = rpcControllerFactory.newController(); controller.setPriority(getTableName()); - controller.setCallTimeout(callTimeout); + controller.setCallTimeout(getPriority()); try { MutateRequest request = RequestConverter.buildMutateRequest( getLocation().getRegionInfo().getRegionName(), append, nonceGroup, nonce); @@ -1136,11 +1139,12 @@ public class HTable implements HTableInterface, RegionLocator { NonceGenerator ng = this.connection.getNonceGenerator(); final long nonceGroup = ng.getNonceGroup(), nonce = ng.newNonce(); RegionServerCallable callable = new RegionServerCallable(this.connection, - getName(), increment.getRow()) { + getName(), increment.getRow(), increment.getPriority()) { @Override public Result call(int callTimeout) throws IOException { HBaseRpcController controller = rpcControllerFactory.newController(); controller.setPriority(getTableName()); + controller.setPriority(getPriority()); controller.setCallTimeout(callTimeout); try { MutateRequest request = RequestConverter.buildMutateRequest( @@ -1236,11 +1240,12 @@ public class HTable implements HTableInterface, RegionLocator { final Put put) throws IOException { RegionServerCallable callable = - new RegionServerCallable(connection, getName(), row) { + new RegionServerCallable(connection, getName(), row, put.getPriority()) { @Override public Boolean call(int callTimeout) throws IOException { HBaseRpcController controller = rpcControllerFactory.newController(); controller.setPriority(tableName); + controller.setPriority(getPriority()); controller.setCallTimeout(callTimeout); try { MutateRequest request = RequestConverter.buildMutateRequest( @@ -1266,11 +1271,12 @@ public class HTable implements HTableInterface, RegionLocator { final Put put) throws IOException { RegionServerCallable callable = - new RegionServerCallable(connection, getName(), row) { + new RegionServerCallable(connection, getName(), row, put.getPriority()) { @Override public Boolean call(int callTimeout) throws IOException { HBaseRpcController controller = rpcControllerFactory.newController(); controller.setPriority(tableName); + controller.setPriority(getPriority()); controller.setCallTimeout(callTimeout); try { CompareType compareType = CompareType.valueOf(compareOp.name()); @@ -1297,11 +1303,12 @@ public class HTable implements HTableInterface, RegionLocator { final Delete delete) throws IOException { RegionServerCallable callable = - new RegionServerCallable(connection, getName(), row) { + new RegionServerCallable(connection, getName(), row, delete.getPriority()) { @Override public Boolean call(int callTimeout) throws IOException { HBaseRpcController controller = rpcControllerFactory.newController(); controller.setPriority(tableName); + controller.setPriority(getPriority()); controller.setCallTimeout(callTimeout); try { MutateRequest request = RequestConverter.buildMutateRequest( @@ -1327,11 +1334,12 @@ public class HTable implements HTableInterface, RegionLocator { final Delete delete) throws IOException { RegionServerCallable callable = - new RegionServerCallable(connection, getName(), row) { + new RegionServerCallable(connection, getName(), row, delete.getPriority()) { @Override public Boolean call(int callTimeout) throws IOException { HBaseRpcController controller = rpcControllerFactory.newController(); controller.setPriority(tableName); + controller.setPriority(getPriority()); controller.setCallTimeout(callTimeout); try { CompareType compareType = CompareType.valueOf(compareOp.name()); @@ -1364,6 +1372,7 @@ public class HTable implements HTableInterface, RegionLocator { public MultiResponse call(int callTimeout) throws IOException { controller.reset(); controller.setPriority(tableName); + controller.setPriority(rm.getMaxPriority()); int remainingTime = tracker.getRemainingTime(callTimeout); if (remainingTime == 0) { throw new DoNotRetryIOException("Timeout for mutate row"); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Increment.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Increment.java index b60cbde32fa..22885d8d95c 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Increment.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Increment.java @@ -86,6 +86,7 @@ public class Increment extends Mutation implements Comparable { for (Map.Entry entry : i.getAttributesMap().entrySet()) { this.setAttribute(entry.getKey(), entry.getValue()); } + super.setPriority(i.getPriority()); } /** @@ -351,4 +352,9 @@ public class Increment extends Mutation implements Comparable { public Increment setTTL(long ttl) { return (Increment) super.setTTL(ttl); } + + @Override + public Increment setPriority(int priority) { + return (Increment) super.setPriority(priority); + } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiAction.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiAction.java index 0a9055eb16d..3ab1dbf4408 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiAction.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiAction.java @@ -104,4 +104,14 @@ public final class MultiAction { public long getNonceGroup() { return this.nonceGroup; } + + public int getPriority() { + int maxPriority = HConstants.PRIORITY_UNSET; + for (List> actionList : actions.values()) { + for (Action action : actionList) { + maxPriority = Math.max(maxPriority, action.getPriority()); + } + } + return maxPriority; + } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java index 738ff6e1428..42c63eb5938 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java @@ -57,8 +57,8 @@ class MultiServerCallable extends PayloadCarryingServerCallable multi, - int rpcTimeout, RetryingTimeTracker tracker) { - super(connection, tableName, null, rpcFactory); + int rpcTimeout, RetryingTimeTracker tracker, int priority) { + super(connection, tableName, null, rpcFactory, priority); this.multiAction = multi; // RegionServerCallable has HRegionLocation field, but this is a multi-region request. // Using region info from parent HRegionLocation would be a mistake for this class; so @@ -130,6 +130,7 @@ class MultiServerCallable extends PayloadCarryingServerCallable protected HBaseRpcController controller; public PayloadCarryingServerCallable(Connection connection, TableName tableName, byte[] row, - RpcControllerFactory rpcControllerFactory) { - super(connection, tableName, row); + RpcControllerFactory rpcControllerFactory) { + this(connection, tableName, row, rpcControllerFactory, HConstants.NORMAL_QOS); + } + + public PayloadCarryingServerCallable(Connection connection, TableName tableName, byte[] row, + RpcControllerFactory rpcControllerFactory, int priority) { + super(connection, tableName, row, priority); this.controller = rpcControllerFactory.newController(); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionServerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionServerCallable.java index e0b09f345fc..7eb09320980 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionServerCallable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionServerCallable.java @@ -23,6 +23,7 @@ import java.io.IOException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HRegionLocation; @@ -50,6 +51,7 @@ public abstract class RegionServerCallable implements RetryingCallable { protected final byte[] row; protected HRegionLocation location; private ClientService.BlockingInterface stub; + protected int priority; /** * @param connection Connection to use. @@ -57,9 +59,14 @@ public abstract class RegionServerCallable implements RetryingCallable { * @param row The row we want in tableName. */ public RegionServerCallable(Connection connection, TableName tableName, byte [] row) { + this(connection, tableName, row, HConstants.NORMAL_QOS); + } + + public RegionServerCallable(Connection connection, TableName tableName, byte [] row, int priority) { this.connection = connection; this.tableName = tableName; this.row = row; + this.priority = priority; } /** @@ -117,6 +124,10 @@ public abstract class RegionServerCallable implements RetryingCallable { return this.row; } + public int getPriority() { + return priority; + } + @Override public void throwable(Throwable t, boolean retrying) { if (location != null) { diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RowMutations.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RowMutations.java index 888306dcb2c..c5ce4eabd27 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RowMutations.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RowMutations.java @@ -114,4 +114,12 @@ public class RowMutations implements Row { public List getMutations() { return Collections.unmodifiableList(mutations); } + + public int getMaxPriority() { + int maxPriority = Integer.MIN_VALUE; + for (Mutation mutation : mutations) { + maxPriority = Math.max(maxPriority, mutation.getPriority()); + } + return maxPriority; + } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java index e6954cc10e1..bfae3d2288e 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java @@ -38,6 +38,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.HBaseIOException; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.RegionLocations; import org.apache.hadoop.hbase.ServerName; @@ -100,7 +101,7 @@ public class RpcRetryingCallerWithReadReplicas { public ReplicaRegionServerCallable(int id, HRegionLocation location) { super(RpcRetryingCallerWithReadReplicas.this.cConnection, - RpcRetryingCallerWithReadReplicas.this.tableName, get.getRow()); + RpcRetryingCallerWithReadReplicas.this.tableName, get.getRow(), HConstants.PRIORITY_UNSET); this.id = id; this.location = location; this.controller = rpcControllerFactory.newController(); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java index 9b8724c9683..4efd405b72c 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java @@ -278,6 +278,7 @@ public class Scan extends Query { this.mvccReadPoint = scan.getMvccReadPoint(); this.limit = scan.getLimit(); this.needCursorResult = scan.isNeedCursorResult(); + setPriority(scan.getPriority()); } /** @@ -307,6 +308,7 @@ public class Scan extends Query { setColumnFamilyTimeRange(entry.getKey(), tr.getMin(), tr.getMax()); } this.mvccReadPoint = -1L; + setPriority(get.getPriority()); } public boolean isGetScan() { @@ -1088,6 +1090,11 @@ public class Scan extends Query { return (Scan) super.setIsolationLevel(level); } + @Override + public Scan setPriority(int priority) { + return (Scan) super.setPriority(priority); + } + /** * Utility that creates a Scan that will do a small scan in reverse from passed row * looking for next closest row. diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java index caa9decf4b4..d8d6e7b0cb4 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java @@ -131,7 +131,7 @@ public class ScannerCallable extends RegionServerCallable { */ public ScannerCallable (ClusterConnection connection, TableName tableName, Scan scan, ScanMetrics scanMetrics, RpcControllerFactory rpcControllerFactory, int id) { - super(connection, tableName, scan.getStartRow()); + super(connection, tableName, scan.getStartRow(), scan.getPriority()); this.id = id; this.cConnection = connection; this.scan = scan; diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcController.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcController.java index 2c4b335d7dc..e7da60b7fbd 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcController.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcController.java @@ -37,8 +37,6 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience; @InterfaceAudience.Private public interface HBaseRpcController extends RpcController, CellScannable { - static final int PRIORITY_UNSET = -1; - /** * Only used to send cells to rpc server, the returned cells should be set by * {@link #setDone(CellScanner)}. diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcControllerImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcControllerImpl.java index a97647399ff..0f20c00f20a 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcControllerImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcControllerImpl.java @@ -56,7 +56,7 @@ public class HBaseRpcControllerImpl implements HBaseRpcController { * This is the ordained way of setting priorities going forward. We will be undoing the old * annotation-based mechanism. */ - private int priority = PRIORITY_UNSET; + private int priority = HConstants.PRIORITY_UNSET; /** * They are optionally set on construction, cleared after we make the call, and then optionally @@ -95,7 +95,7 @@ public class HBaseRpcControllerImpl implements HBaseRpcController { @Override public void setPriority(int priority) { - this.priority = priority; + this.priority = Math.max(this.priority, priority); } @Override @@ -106,7 +106,7 @@ public class HBaseRpcControllerImpl implements HBaseRpcController { @Override public int getPriority() { - return priority; + return priority < 0 ? HConstants.NORMAL_QOS : priority; } @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "IS2_INCONSISTENT_SYNC", diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java index 4fa58adb082..9a4a5c6d80a 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java @@ -29,6 +29,7 @@ import java.net.SocketTimeoutException; import java.nio.ByteBuffer; import org.apache.hadoop.hbase.DoNotRetryIOException; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.exceptions.ConnectionClosingException; import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.CellBlockMeta; @@ -111,7 +112,7 @@ class IPCUtil { builder.setCellBlockMeta(cellBlockMeta); } // Only pass priority if there is one set. - if (call.priority != HBaseRpcController.PRIORITY_UNSET) { + if (call.priority != HConstants.PRIORITY_UNSET) { builder.setPriority(call.priority); } builder.setTimeout(call.timeout); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RegionCoprocessorRpcChannel.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RegionCoprocessorRpcChannel.java index 0052423ff9b..f942aedb60c 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RegionCoprocessorRpcChannel.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RegionCoprocessorRpcChannel.java @@ -26,6 +26,7 @@ import java.io.IOException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.ClusterConnection; @@ -84,7 +85,7 @@ public class RegionCoprocessorRpcChannel extends CoprocessorRpcChannel{ final ClientProtos.CoprocessorServiceCall call = CoprocessorRpcUtils.buildServiceCall(row, method, request); RegionServerCallable callable = - new RegionServerCallable(connection, table, row) { + new RegionServerCallable(connection, table, row, HConstants.PRIORITY_UNSET) { @Override public CoprocessorServiceResponse call(int callTimeout) throws Exception { if (rpcController instanceof HBaseRpcController) { diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java index 93461f91680..8df7bd89aaf 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java @@ -1114,6 +1114,7 @@ public final class HConstants { * handled by high priority handlers. */ // normal_QOS < QOS_threshold < replication_QOS < replay_QOS < admin_QOS < high_QOS + public static final int PRIORITY_UNSET = -1; public static final int NORMAL_QOS = 0; public static final int QOS_THRESHOLD = 10; public static final int HIGH_QOS = 200; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRpcControllerFactory.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRpcControllerFactory.java index 1d49460a30f..f5cfa2c4acb 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRpcControllerFactory.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRpcControllerFactory.java @@ -20,13 +20,16 @@ package org.apache.hadoop.hbase.client; import static org.apache.hadoop.hbase.HBaseTestingUtility.fam1; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import com.google.common.collect.ConcurrentHashMultiset; import com.google.common.collect.Lists; import java.io.IOException; import java.util.List; import java.util.concurrent.atomic.AtomicInteger; +import com.google.common.collect.Multiset; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.CellScannable; import org.apache.hadoop.hbase.CellScanner; @@ -72,6 +75,7 @@ public class TestRpcControllerFactory { public static class CountingRpcController extends DelegatingHBaseRpcController { + private static Multiset GROUPED_PRIORITY = ConcurrentHashMultiset.create(); private static AtomicInteger INT_PRIORITY = new AtomicInteger(); private static AtomicInteger TABLE_PRIORITY = new AtomicInteger(); @@ -81,8 +85,13 @@ public class TestRpcControllerFactory { @Override public void setPriority(int priority) { + int oldPriority = getPriority(); super.setPriority(priority); - INT_PRIORITY.incrementAndGet(); + int newPriority = getPriority(); + if (newPriority != oldPriority) { + INT_PRIORITY.incrementAndGet(); + GROUPED_PRIORITY.add(priority); + } } @Override @@ -189,6 +198,14 @@ public class TestRpcControllerFactory { scanInfo.setSmall(false); counter = doScan(table, scanInfo, counter); + // make sure we have no priority count + verifyPriorityGroupCount(HConstants.ADMIN_QOS, 0); + // lets set a custom priority on a get + Get get = new Get(row); + get.setPriority(HConstants.ADMIN_QOS); + table.get(get); + verifyPriorityGroupCount(HConstants.ADMIN_QOS, 1); + table.close(); } @@ -200,9 +217,13 @@ public class TestRpcControllerFactory { } int verifyCount(Integer counter) { - assertEquals(counter.intValue(), CountingRpcController.TABLE_PRIORITY.get()); + assertTrue(CountingRpcController.TABLE_PRIORITY.get() >= counter); assertEquals(0, CountingRpcController.INT_PRIORITY.get()); - return counter + 1; + return CountingRpcController.TABLE_PRIORITY.get() + 1; + } + + void verifyPriorityGroupCount(int priorityLevel, int count) { + assertEquals(count, CountingRpcController.GROUPED_PRIORITY.count(priorityLevel)); } @Test diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java index 1ea65fa1878..12559e7fb96 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java @@ -377,6 +377,7 @@ public class TestHeapSize { expected = ClassSize.estimateBase(cl, false); //The actual TreeMap is not included in the above calculation expected += ClassSize.align(ClassSize.TREEMAP); + expected += ClassSize.align(ClassSize.INTEGER); // priority if (expected != actual) { ClassSize.estimateBase(cl, true); assertEquals(expected, actual); @@ -387,6 +388,7 @@ public class TestHeapSize { expected = ClassSize.estimateBase(cl, false); //The actual TreeMap is not included in the above calculation expected += ClassSize.align(ClassSize.TREEMAP); + expected += ClassSize.align(ClassSize.INTEGER); // priority if (expected != actual) { ClassSize.estimateBase(cl, true); assertEquals(expected, actual);