diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/OnlineLogRecord.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/OnlineLogRecord.java index 3ea3061a62f..65e2f58f452 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/OnlineLogRecord.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/OnlineLogRecord.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hbase.client; +import java.util.Optional; import org.apache.commons.lang3.builder.EqualsBuilder; import org.apache.commons.lang3.builder.HashCodeBuilder; import org.apache.commons.lang3.builder.ToStringBuilder; @@ -52,6 +53,11 @@ final public class OnlineLogRecord extends LogEntry { if (slowLogPayload.getMultiServiceCalls() == 0) { jsonObj.remove("multiServiceCalls"); } + if (slowLogPayload.getScan().isPresent()) { + jsonObj.add("scan", gson.toJsonTree(slowLogPayload.getScan().get().toMap())); + } else { + jsonObj.remove("scan"); + } return jsonObj; }).create(); @@ -72,6 +78,7 @@ final public class OnlineLogRecord extends LogEntry { private final int multiGetsCount; private final int multiMutationsCount; private final int multiServiceCalls; + private final Optional scan; public long getStartTime() { return startTime; @@ -136,11 +143,20 @@ final public class OnlineLogRecord extends LogEntry { return multiServiceCalls; } - private OnlineLogRecord(final long startTime, final int processingTime, final int queueTime, + /** + * If {@value org.apache.hadoop.hbase.HConstants#SLOW_LOG_SCAN_PAYLOAD_ENABLED} is enabled then + * this value may be present and should represent the Scan that produced the given + * {@link OnlineLogRecord} + */ + public Optional getScan() { + return scan; + } + + OnlineLogRecord(final long startTime, final int processingTime, final int queueTime, final long responseSize, final long blockBytesScanned, final String clientAddress, final String serverClass, final String methodName, final String callDetails, final String param, final String regionName, final String userName, final int multiGetsCount, - final int multiMutationsCount, final int multiServiceCalls) { + final int multiMutationsCount, final int multiServiceCalls, final Scan scan) { this.startTime = startTime; this.processingTime = processingTime; this.queueTime = queueTime; @@ -156,6 +172,7 @@ final public class OnlineLogRecord extends LogEntry { this.multiGetsCount = multiGetsCount; this.multiMutationsCount = multiMutationsCount; this.multiServiceCalls = multiServiceCalls; + this.scan = Optional.ofNullable(scan); } public static class OnlineLogRecordBuilder { @@ -174,6 +191,7 @@ final public class OnlineLogRecord extends LogEntry { private int multiGetsCount; private int multiMutationsCount; private int multiServiceCalls; + private Scan scan = null; public OnlineLogRecordBuilder setStartTime(long startTime) { this.startTime = startTime; @@ -253,10 +271,15 @@ final public class OnlineLogRecord extends LogEntry { return this; } + public OnlineLogRecordBuilder setScan(Scan scan) { + this.scan = scan; + return this; + } + public OnlineLogRecord build() { return new OnlineLogRecord(startTime, processingTime, queueTime, responseSize, blockBytesScanned, clientAddress, serverClass, methodName, callDetails, param, regionName, - userName, multiGetsCount, multiMutationsCount, multiServiceCalls); + userName, multiGetsCount, multiMutationsCount, multiServiceCalls, scan); } } @@ -280,7 +303,8 @@ final public class OnlineLogRecord extends LogEntry { .append(multiServiceCalls, that.multiServiceCalls).append(clientAddress, that.clientAddress) .append(serverClass, that.serverClass).append(methodName, that.methodName) .append(callDetails, that.callDetails).append(param, that.param) - .append(regionName, that.regionName).append(userName, that.userName).isEquals(); + .append(regionName, that.regionName).append(userName, that.userName).append(scan, that.scan) + .isEquals(); } @Override @@ -288,7 +312,8 @@ final public class OnlineLogRecord extends LogEntry { return new HashCodeBuilder(17, 37).append(startTime).append(processingTime).append(queueTime) .append(responseSize).append(blockBytesScanned).append(clientAddress).append(serverClass) .append(methodName).append(callDetails).append(param).append(regionName).append(userName) - .append(multiGetsCount).append(multiMutationsCount).append(multiServiceCalls).toHashCode(); + .append(multiGetsCount).append(multiMutationsCount).append(multiServiceCalls).append(scan) + .toHashCode(); } @Override @@ -305,7 +330,7 @@ final public class OnlineLogRecord extends LogEntry { .append("methodName", methodName).append("callDetails", callDetails).append("param", param) .append("regionName", regionName).append("userName", userName) .append("multiGetsCount", multiGetsCount).append("multiMutationsCount", multiMutationsCount) - .append("multiServiceCalls", multiServiceCalls).toString(); + .append("multiServiceCalls", multiServiceCalls).append("scan", scan).toString(); } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SlowLogParams.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SlowLogParams.java index b1460c0b116..6af7c42c26d 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SlowLogParams.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SlowLogParams.java @@ -23,6 +23,8 @@ import org.apache.commons.lang3.builder.HashCodeBuilder; import org.apache.commons.lang3.builder.ToStringBuilder; import org.apache.yetus.audience.InterfaceAudience; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; + /** * SlowLog params object that contains detailed info as params and region name : to be used for * filter purpose @@ -32,15 +34,24 @@ public class SlowLogParams { private final String regionName; private final String params; + private final ClientProtos.Scan scan; + + public SlowLogParams(String regionName, String params, ClientProtos.Scan scan) { + this.regionName = regionName; + this.params = params; + this.scan = scan; + } public SlowLogParams(String regionName, String params) { this.regionName = regionName; this.params = params; + this.scan = null; } public SlowLogParams(String params) { this.regionName = StringUtils.EMPTY; this.params = params; + this.scan = null; } public String getRegionName() { @@ -51,10 +62,14 @@ public class SlowLogParams { return params; } + public ClientProtos.Scan getScan() { + return scan; + } + @Override public String toString() { return new ToStringBuilder(this).append("regionName", regionName).append("params", params) - .toString(); + .append("scan", scan).toString(); } @Override @@ -67,11 +82,11 @@ public class SlowLogParams { } SlowLogParams that = (SlowLogParams) o; return new EqualsBuilder().append(regionName, that.regionName).append(params, that.params) - .isEquals(); + .append("scan", scan).isEquals(); } @Override public int hashCode() { - return new HashCodeBuilder(17, 37).append(regionName).append(params).toHashCode(); + return new HashCodeBuilder(17, 37).append(regionName).append(params).append(scan).toHashCode(); } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java index 34ba5c58511..4722610abf7 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java @@ -126,6 +126,8 @@ import org.apache.hadoop.hbase.util.Methods; import org.apache.hadoop.hbase.util.VersionInfo; import org.apache.hadoop.ipc.RemoteException; import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.apache.hbase.thirdparty.com.google.common.io.ByteStreams; import org.apache.hbase.thirdparty.com.google.gson.JsonArray; @@ -226,6 +228,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ZooKeeperProtos; @InterfaceAudience.Private // TODO: some clients (Hive, etc) use this class public final class ProtobufUtil { + private static final Logger LOG = LoggerFactory.getLogger(ProtobufUtil.class.getName()); + private ProtobufUtil() { } @@ -2111,7 +2115,7 @@ public final class ProtobufUtil { * @param message Message object {@link Message} * @return SlowLogParams with regionName(for filter queries) and params */ - public static SlowLogParams getSlowLogParams(Message message) { + public static SlowLogParams getSlowLogParams(Message message, boolean slowLogScanPayloadEnabled) { if (message == null) { return null; } @@ -2119,7 +2123,11 @@ public final class ProtobufUtil { ScanRequest scanRequest = (ScanRequest) message; String regionName = getStringForByteString(scanRequest.getRegion().getValue()); String params = TextFormat.shortDebugString(message); - return new SlowLogParams(regionName, params); + if (slowLogScanPayloadEnabled) { + return new SlowLogParams(regionName, params, scanRequest.getScan()); + } else { + return new SlowLogParams(regionName, params); + } } else if (message instanceof MutationProto) { MutationProto mutationProto = (MutationProto) message; String params = "type= " + mutationProto.getMutateType().toString(); @@ -3326,7 +3334,7 @@ public final class ProtobufUtil { * @return SlowLog Payload for client usecase */ private static LogEntry getSlowLogRecord(final TooSlowLog.SlowLogPayload slowLogPayload) { - OnlineLogRecord onlineLogRecord = + OnlineLogRecord.OnlineLogRecordBuilder onlineLogRecord = new OnlineLogRecord.OnlineLogRecordBuilder().setCallDetails(slowLogPayload.getCallDetails()) .setClientAddress(slowLogPayload.getClientAddress()) .setMethodName(slowLogPayload.getMethodName()) @@ -3338,8 +3346,15 @@ public final class ProtobufUtil { .setResponseSize(slowLogPayload.getResponseSize()) .setBlockBytesScanned(slowLogPayload.getBlockBytesScanned()) .setServerClass(slowLogPayload.getServerClass()).setStartTime(slowLogPayload.getStartTime()) - .setUserName(slowLogPayload.getUserName()).build(); - return onlineLogRecord; + .setUserName(slowLogPayload.getUserName()); + if (slowLogPayload.hasScan()) { + try { + onlineLogRecord.setScan(ProtobufUtil.toScan(slowLogPayload.getScan())); + } catch (Exception e) { + LOG.warn("Failed to convert Scan proto {}", slowLogPayload.getScan(), e); + } + } + return onlineLogRecord.build(); } /** diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestOnlineLogRecord.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestOnlineLogRecord.java new file mode 100644 index 00000000000..846738d8298 --- /dev/null +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestOnlineLogRecord.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.testclassification.ClientTests; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.Assert; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({ ClientTests.class, SmallTests.class }) +public class TestOnlineLogRecord { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestOnlineLogRecord.class); + + @Test + public void itSerializesScan() { + Scan scan = new Scan(); + scan.withStartRow(Bytes.toBytes(123)); + scan.withStopRow(Bytes.toBytes(456)); + String expectedOutput = "{\n" + " \"startTime\": 1,\n" + " \"processingTime\": 2,\n" + + " \"queueTime\": 3,\n" + " \"responseSize\": 4,\n" + " \"blockBytesScanned\": 5,\n" + + " \"multiGetsCount\": 6,\n" + " \"multiMutationsCount\": 7,\n" + " \"scan\": {\n" + + " \"startRow\": \"\\\\x00\\\\x00\\\\x00{\",\n" + + " \"stopRow\": \"\\\\x00\\\\x00\\\\x01\\\\xC8\",\n" + " \"batch\": -1,\n" + + " \"cacheBlocks\": true,\n" + " \"totalColumns\": 0,\n" + + " \"maxResultSize\": -1,\n" + " \"families\": {},\n" + " \"caching\": -1,\n" + + " \"maxVersions\": 1,\n" + " \"timeRange\": [\n" + " 0,\n" + + " 9223372036854775807\n" + " ]\n" + " }\n" + "}"; + OnlineLogRecord o = + new OnlineLogRecord(1, 2, 3, 4, 5, null, null, null, null, null, null, null, 6, 7, 0, scan); + String actualOutput = o.toJsonPrettyPrint(); + System.out.println(actualOutput); + Assert.assertEquals(actualOutput, expectedOutput); + } +} diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java index 054aa603bc5..c3b6d3bbb81 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java @@ -1656,6 +1656,9 @@ public final class HConstants { // Default 10 mins. public static final int DEFAULT_SLOW_LOG_SYS_TABLE_CHORE_DURATION = 10 * 60 * 1000; + public static final String SLOW_LOG_SCAN_PAYLOAD_ENABLED = "hbase.slowlog.scan.payload.enabled"; + public static final boolean SLOW_LOG_SCAN_PAYLOAD_ENABLED_DEFAULT = false; + public static final String SHELL_TIMESTAMP_FORMAT_EPOCH_KEY = "hbase.shell.timestamp.format.epoch"; diff --git a/hbase-protocol-shaded/src/main/protobuf/TooSlowLog.proto b/hbase-protocol-shaded/src/main/protobuf/TooSlowLog.proto index 4ed0324e5a8..e6194851947 100644 --- a/hbase-protocol-shaded/src/main/protobuf/TooSlowLog.proto +++ b/hbase-protocol-shaded/src/main/protobuf/TooSlowLog.proto @@ -27,6 +27,8 @@ option java_outer_classname = "TooSlowLog"; option java_generate_equals_and_hash = true; option optimize_for = SPEED; +import "Client.proto"; + message SlowLogPayload { required int64 start_time = 1; required int32 processing_time = 2; @@ -45,6 +47,7 @@ message SlowLogPayload { required Type type = 15; optional int64 block_bytes_scanned = 16; + optional Scan scan = 17; // SLOW_LOG is RPC call slow in nature whereas LARGE_LOG is RPC call quite large. // Majority of times, slow logs are also large logs and hence, ALL is combination of diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/impl/SlowLogQueueService.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/impl/SlowLogQueueService.java index fb002fdf3e8..48121a8b066 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/impl/SlowLogQueueService.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/impl/SlowLogQueueService.java @@ -65,10 +65,13 @@ public class SlowLogQueueService implements NamedQueueService { private final boolean isSlowLogTableEnabled; private final SlowLogPersistentService slowLogPersistentService; private final Queue slowLogQueue; + private final boolean slowLogScanPayloadEnabled; public SlowLogQueueService(Configuration conf) { this.isOnlineLogProviderEnabled = conf.getBoolean(HConstants.SLOW_LOG_BUFFER_ENABLED_KEY, HConstants.DEFAULT_ONLINE_LOG_PROVIDER_ENABLED); + this.slowLogScanPayloadEnabled = conf.getBoolean(HConstants.SLOW_LOG_SCAN_PAYLOAD_ENABLED, + HConstants.SLOW_LOG_SCAN_PAYLOAD_ENABLED_DEFAULT); if (!isOnlineLogProviderEnabled) { this.isSlowLogTableEnabled = false; @@ -129,7 +132,8 @@ public class SlowLogQueueService implements NamedQueueService { long endTime = EnvironmentEdgeManager.currentTime(); int processingTime = (int) (endTime - startTime); int qTime = (int) (startTime - receiveTime); - final SlowLogParams slowLogParams = ProtobufUtil.getSlowLogParams(param); + final SlowLogParams slowLogParams = + ProtobufUtil.getSlowLogParams(param, slowLogScanPayloadEnabled); int numGets = 0; int numMutations = 0; int numServiceCalls = 0; @@ -152,7 +156,7 @@ public class SlowLogQueueService implements NamedQueueService { final String userName = rpcCall.getRequestUserName().orElse(StringUtils.EMPTY); final String methodDescriptorName = methodDescriptor != null ? methodDescriptor.getName() : StringUtils.EMPTY; - TooSlowLog.SlowLogPayload slowLogPayload = TooSlowLog.SlowLogPayload.newBuilder() + TooSlowLog.SlowLogPayload.Builder slowLogPayloadBuilder = TooSlowLog.SlowLogPayload.newBuilder() .setCallDetails(methodDescriptorName + "(" + param.getClass().getName() + ")") .setClientAddress(clientAddress).setMethodName(methodDescriptorName).setMultiGets(numGets) .setMultiMutations(numMutations).setMultiServiceCalls(numServiceCalls) @@ -160,8 +164,11 @@ public class SlowLogQueueService implements NamedQueueService { .setProcessingTime(processingTime).setQueueTime(qTime) .setRegionName(slowLogParams != null ? slowLogParams.getRegionName() : StringUtils.EMPTY) .setResponseSize(responseSize).setBlockBytesScanned(blockBytesScanned) - .setServerClass(className).setStartTime(startTime).setType(type).setUserName(userName) - .build(); + .setServerClass(className).setStartTime(startTime).setType(type).setUserName(userName); + if (slowLogParams != null && slowLogParams.getScan() != null) { + slowLogPayloadBuilder.setScan(slowLogParams.getScan()); + } + TooSlowLog.SlowLogPayload slowLogPayload = slowLogPayloadBuilder.build(); slowLogQueue.add(slowLogPayload); if (isSlowLogTableEnabled) { if (!slowLogPayload.getRegionName().startsWith("hbase:slowlog")) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/namequeues/TestNamedQueueRecorder.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/namequeues/TestNamedQueueRecorder.java index d9970773dc7..ef91779e699 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/namequeues/TestNamedQueueRecorder.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/namequeues/TestNamedQueueRecorder.java @@ -527,12 +527,96 @@ public class TestNamedQueueRecorder { HBASE_TESTING_UTILITY.waitFor(3000, () -> getSlowLogPayloads(requestSlowLog).size() == 15)); } + @Test + public void testOnlineSlowLogScanPayloadDefaultDisabled() throws Exception { + Configuration conf = applySlowLogRecorderConf(1); + conf.unset(HConstants.SLOW_LOG_SCAN_PAYLOAD_ENABLED); + Constructor constructor = + NamedQueueRecorder.class.getDeclaredConstructor(Configuration.class); + constructor.setAccessible(true); + namedQueueRecorder = constructor.newInstance(conf); + AdminProtos.SlowLogResponseRequest request = + AdminProtos.SlowLogResponseRequest.newBuilder().setLimit(1).build(); + + Assert.assertEquals(getSlowLogPayloads(request).size(), 0); + LOG.debug("Initially ringbuffer of Slow Log records is empty"); + RpcLogDetails rpcLogDetails = getRpcLogDetailsOfScan(); + namedQueueRecorder.addRecord(rpcLogDetails); + Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000, () -> { + Optional slowLogPayload = getSlowLogPayloads(request).stream().findAny(); + if (slowLogPayload.isPresent()) { + return !slowLogPayload.get().hasScan(); + } + return false; + })); + } + + @Test + public void testOnlineSlowLogScanPayloadExplicitlyDisabled() throws Exception { + Configuration conf = applySlowLogRecorderConf(1); + conf.setBoolean(HConstants.SLOW_LOG_SCAN_PAYLOAD_ENABLED, false); + Constructor constructor = + NamedQueueRecorder.class.getDeclaredConstructor(Configuration.class); + constructor.setAccessible(true); + namedQueueRecorder = constructor.newInstance(conf); + AdminProtos.SlowLogResponseRequest request = + AdminProtos.SlowLogResponseRequest.newBuilder().setLimit(1).build(); + + Assert.assertEquals(getSlowLogPayloads(request).size(), 0); + LOG.debug("Initially ringbuffer of Slow Log records is empty"); + RpcLogDetails rpcLogDetails = getRpcLogDetailsOfScan(); + namedQueueRecorder.addRecord(rpcLogDetails); + Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000, () -> { + Optional slowLogPayload = getSlowLogPayloads(request).stream().findAny(); + if (slowLogPayload.isPresent()) { + return !slowLogPayload.get().hasScan(); + } + return false; + })); + } + + @Test + public void testOnlineSlowLogScanPayloadExplicitlyEnabled() throws Exception { + Configuration conf = applySlowLogRecorderConf(1); + conf.setBoolean(HConstants.SLOW_LOG_SCAN_PAYLOAD_ENABLED, true); + Constructor constructor = + NamedQueueRecorder.class.getDeclaredConstructor(Configuration.class); + constructor.setAccessible(true); + namedQueueRecorder = constructor.newInstance(conf); + AdminProtos.SlowLogResponseRequest request = + AdminProtos.SlowLogResponseRequest.newBuilder().setLimit(1).build(); + + Assert.assertEquals(getSlowLogPayloads(request).size(), 0); + LOG.debug("Initially ringbuffer of Slow Log records is empty"); + RpcLogDetails rpcLogDetails = getRpcLogDetailsOfScan(); + namedQueueRecorder.addRecord(rpcLogDetails); + Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000, () -> { + Optional slowLogPayload = getSlowLogPayloads(request).stream().findAny(); + if (slowLogPayload.isPresent()) { + return slowLogPayload.get().hasScan(); + } + return false; + })); + } + + static RpcLogDetails getRpcLogDetails(String userName, String clientAddress, String className, + int forcedParamIndex) { + RpcCall rpcCall = getRpcCall(userName, forcedParamIndex); + return new RpcLogDetails(rpcCall, rpcCall.getParam(), clientAddress, 0, 0, className, true, + true); + } + static RpcLogDetails getRpcLogDetails(String userName, String clientAddress, String className) { RpcCall rpcCall = getRpcCall(userName); return new RpcLogDetails(rpcCall, rpcCall.getParam(), clientAddress, 0, 0, className, true, true); } + private static RpcLogDetails getRpcLogDetailsOfScan() { + // forcedParamIndex of 0 results in a ScanRequest + return getRpcLogDetails("userName_1", "client_1", "class_1", 0); + } + private RpcLogDetails getRpcLogDetails(String userName, String clientAddress, String className, boolean isSlowLog, boolean isLargeLog) { RpcCall rpcCall = getRpcCall(userName); @@ -541,6 +625,14 @@ public class TestNamedQueueRecorder { } private static RpcCall getRpcCall(String userName) { + return getRpcCall(userName, Optional.empty()); + } + + private static RpcCall getRpcCall(String userName, int forcedParamIndex) { + return getRpcCall(userName, Optional.of(forcedParamIndex)); + } + + private static RpcCall getRpcCall(String userName, Optional forcedParamIndex) { RpcCall rpcCall = new RpcCall() { @Override public BlockingService getService() { @@ -554,7 +646,7 @@ public class TestNamedQueueRecorder { @Override public Message getParam() { - return getMessage(); + return getMessage(forcedParamIndex); } @Override @@ -689,13 +781,13 @@ public class TestNamedQueueRecorder { return rpcCall; } - private static Message getMessage() { + private static Message getMessage(Optional forcedParamIndex) { i = (i + 1) % 3; Message message = null; - switch (i) { + switch (forcedParamIndex.orElse(i)) { case 0: { message = ClientProtos.ScanRequest.newBuilder()