diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java index 298a9fc3aeb..a0367de1d0f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java @@ -58,9 +58,9 @@ public class RWQueueRpcExecutor extends RpcExecutor { private final int writeHandlersCount; private final int readHandlersCount; private final int scanHandlersCount; - private final int numWriteQueues; - private final int numReadQueues; - private final int numScanQueues; + protected final int numWriteQueues; + protected final int numReadQueues; + protected final int numScanQueues; private final AtomicInteger activeWriteHandlerCount = new AtomicInteger(0); private final AtomicInteger activeReadHandlerCount = new AtomicInteger(0); @@ -97,9 +97,7 @@ public class RWQueueRpcExecutor extends RpcExecutor { numScanQueues = scanQueues; scanHandlersCount = scanHandlers; - initializeQueues(numWriteQueues); - initializeQueues(numReadQueues); - initializeQueues(numScanQueues); + initQueues(); this.writeBalancer = getBalancer(name, conf, queues.subList(0, numWriteQueues)); this.readBalancer = @@ -115,6 +113,12 @@ public class RWQueueRpcExecutor extends RpcExecutor { + numScanQueues + " scanHandlers=" + scanHandlersCount); } + protected void initQueues() { + initializeQueues(numWriteQueues); + initializeQueues(numReadQueues); + initializeQueues(numScanQueues); + } + @Override protected int computeNumCallQueues(final int handlerCount, final float callQueuesHandlersFactor) { // at least 1 read queue and 1 write queue diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java index 7e5bdfcc7d6..38092af3634 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java @@ -72,6 +72,7 @@ public abstract class RpcExecutor { public static final String CALL_QUEUE_TYPE_CODEL_CONF_VALUE = "codel"; public static final String CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE = "deadline"; public static final String CALL_QUEUE_TYPE_FIFO_CONF_VALUE = "fifo"; + public static final String CALL_QUEUE_TYPE_READ_STEAL_CONF_VALUE = "readSteal"; public static final String CALL_QUEUE_TYPE_PLUGGABLE_CONF_VALUE = "pluggable"; public static final String CALL_QUEUE_TYPE_CONF_KEY = "hbase.ipc.server.callqueue.type"; public static final String CALL_QUEUE_TYPE_CONF_DEFAULT = CALL_QUEUE_TYPE_FIFO_CONF_VALUE; @@ -101,7 +102,7 @@ public abstract class RpcExecutor { private final LongAdder numLifoModeSwitches = new LongAdder(); protected final int numCallQueues; - protected final List> queues; + protected List> queues; private final Class queueClass; private final Object[] queueInitArgs; @@ -117,6 +118,8 @@ public abstract class RpcExecutor { private final Configuration conf; private final Abortable abortable; + protected int maxQueueLength; + public RpcExecutor(final String name, final int handlerCount, final int maxQueueLength, final PriorityFunction priority, final Configuration conf, final Abortable abortable) { this(name, handlerCount, conf.get(CALL_QUEUE_TYPE_CONF_KEY, CALL_QUEUE_TYPE_CONF_DEFAULT), @@ -129,6 +132,7 @@ public abstract class RpcExecutor { this.name = Strings.nullToEmpty(name); this.conf = conf; this.abortable = abortable; + this.maxQueueLength = maxQueueLength; float callQueuesHandlersFactor = this.conf.getFloat(CALL_QUEUE_HANDLER_FACTOR_CONF_KEY, 0.1f); if ( diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java index 92b38757031..8e5dff2fa8b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java @@ -17,6 +17,8 @@ */ package org.apache.hadoop.hbase.ipc; +import static org.apache.hadoop.hbase.ipc.RpcExecutor.CALL_QUEUE_TYPE_READ_STEAL_CONF_VALUE; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Abortable; import org.apache.hadoop.hbase.HBaseInterfaceAudience; @@ -86,9 +88,14 @@ public class SimpleRpcScheduler extends RpcScheduler implements ConfigurationObs float callqReadShare = conf.getFloat(RWQueueRpcExecutor.CALL_QUEUE_READ_SHARE_CONF_KEY, 0); if (callqReadShare > 0) { - // at least 1 read handler and 1 write handler - callExecutor = new FastPathRWQueueRpcExecutor("default.FPRWQ", Math.max(2, handlerCount), - maxQueueLength, priority, conf, server); + if (callQueueType.equals(CALL_QUEUE_TYPE_READ_STEAL_CONF_VALUE)) { + callExecutor = new StealReadJobRWQueueRpcExecutor("default.SRRWQ", + Math.max(2, handlerCount), maxQueueLength, priority, conf, server); + } else { + // at least 1 read handler and 1 write handler + callExecutor = new FastPathRWQueueRpcExecutor("default.FPRWQ", Math.max(2, handlerCount), + maxQueueLength, priority, conf, server); + } } else { if ( RpcExecutor.isFifoQueueType(callQueueType) || RpcExecutor.isCodelQueueType(callQueueType) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/StealReadJobRWQueueRpcExecutor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/StealReadJobRWQueueRpcExecutor.java new file mode 100644 index 00000000000..c50df374c42 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/StealReadJobRWQueueRpcExecutor.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.ipc; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.BlockingQueue; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Abortable; +import org.apache.hadoop.hbase.HBaseInterfaceAudience; +import org.apache.yetus.audience.InterfaceAudience; +import org.apache.yetus.audience.InterfaceStability; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@InterfaceAudience.LimitedPrivate({ HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX }) +@InterfaceStability.Evolving +public class StealReadJobRWQueueRpcExecutor extends RWQueueRpcExecutor { + private static final Logger LOG = LoggerFactory.getLogger(StealReadJobRWQueueRpcExecutor.class); + + public StealReadJobRWQueueRpcExecutor(String name, int handlerCount, int maxQueueLength, + PriorityFunction priority, Configuration conf, Abortable abortable) { + super(name, handlerCount, maxQueueLength, priority, conf, abortable); + } + + @Override + public void initQueues() { + queues = new ArrayList<>(this.numWriteQueues + this.numReadQueues + numScanQueues); + initializeQueues(numWriteQueues); + if (numReadQueues > 0 && numScanQueues > 0) { + int stealQueueCount = Math.min(numReadQueues, numScanQueues); + List> stealScanQueues = new ArrayList<>(stealQueueCount); + for (int i = 0; i < stealQueueCount; i++) { + StealRpcJobQueue scanQueue = new StealRpcJobQueue(maxQueueLength, maxQueueLength); + BlockingQueue readQueue = scanQueue.getStealFromQueue(); + queues.add(readQueue); + stealScanQueues.add(scanQueue); + } + if (numReadQueues > numScanQueues) { + initializeQueues(numReadQueues - numScanQueues); + } + queues.addAll(stealScanQueues); + if (numScanQueues > numReadQueues) { + initializeQueues(numScanQueues - numReadQueues); + } + } else { + initializeQueues(Math.max(numReadQueues, numScanQueues)); + } + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/StealRpcJobQueue.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/StealRpcJobQueue.java new file mode 100644 index 00000000000..92e693f9e90 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/StealRpcJobQueue.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.ipc; + +import java.util.Comparator; +import org.apache.hadoop.hbase.util.StealJobQueue; +import org.apache.yetus.audience.InterfaceAudience; + +@InterfaceAudience.Private +public class StealRpcJobQueue extends StealJobQueue { + + public final static RpcComparator RPCCOMPARATOR = new RpcComparator(); + + public StealRpcJobQueue(int initCapacity, int stealFromQueueInitCapacity) { + super(initCapacity, stealFromQueueInitCapacity, RPCCOMPARATOR); + } + + public static class RpcComparator implements Comparator { + public RpcComparator() { + super(); + } + + @Override + public int compare(CallRunner o1, CallRunner o2) { + long diff = o1.getRpcCall().getReceiveTime() - o2.getRpcCall().getReceiveTime(); + if (diff > 0) { + return 1; + } else if (diff < 0) { + return -1; + } else { + return 0; + } + } + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/StealJobQueue.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/StealJobQueue.java index 7b41331abeb..d9bf8cf0019 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/StealJobQueue.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/StealJobQueue.java @@ -49,6 +49,10 @@ public class StealJobQueue extends PriorityBlockingQueue { this(11, 11, comparator); } + public StealJobQueue(int initCapacity, int stealFromQueueInitCapacity) { + this(initCapacity, stealFromQueueInitCapacity, null); + } + public StealJobQueue(int initCapacity, int stealFromQueueInitCapacity, Comparator comparator) { super(initCapacity, comparator); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiParallel.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiParallel.java index 96ece59505a..1655f00ca2f 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiParallel.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiParallel.java @@ -65,17 +65,17 @@ public class TestMultiParallel { private static final Logger LOG = LoggerFactory.getLogger(TestMultiParallel.class); - private static final HBaseTestingUtil UTIL = new HBaseTestingUtil(); + protected static final HBaseTestingUtil UTIL = new HBaseTestingUtil(); private static final byte[] VALUE = Bytes.toBytes("value"); private static final byte[] QUALIFIER = Bytes.toBytes("qual"); - private static final String FAMILY = "family"; - private static final TableName TEST_TABLE = TableName.valueOf("multi_test_table"); + protected static final String FAMILY = "family"; + protected static final TableName TEST_TABLE = TableName.valueOf("multi_test_table"); private static final byte[] BYTES_FAMILY = Bytes.toBytes(FAMILY); private static final byte[] ONE_ROW = Bytes.toBytes("xxx"); private static final byte[][] KEYS = makeKeys(); - private static final int slaves = 5; // also used for testing HTable pool size - private static Connection CONNECTION; + protected static final int slaves = 5; // also used for testing HTable pool size + protected static Connection CONNECTION; @BeforeClass public static void beforeClass() throws Exception { @@ -662,7 +662,7 @@ public class TestMultiParallel { public static class MyMasterObserver implements MasterObserver, MasterCoprocessor { private static final AtomicInteger postBalanceCount = new AtomicInteger(0); - private static final AtomicBoolean start = new AtomicBoolean(false); + protected static final AtomicBoolean start = new AtomicBoolean(false); @Override public void start(CoprocessorEnvironment env) throws IOException { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiParallel2.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiParallel2.java new file mode 100644 index 00000000000..0db21da769e --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiParallel2.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import static org.junit.Assert.assertTrue; + +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.codec.KeyValueCodec; +import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; +import org.apache.hadoop.hbase.ipc.RWQueueRpcExecutor; +import org.apache.hadoop.hbase.ipc.RpcExecutor; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({ MediumTests.class }) +public class TestMultiParallel2 extends TestMultiParallel { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestMultiParallel2.class); + + public static void beforeClass() throws Exception { + // Uncomment the following lines if more verbosity is needed for + // debugging (see HBASE-12285 for details). + // ((Log4JLogger)RpcServer.LOG).getLogger().setLevel(Level.ALL); + // ((Log4JLogger)RpcClient.LOG).getLogger().setLevel(Level.ALL); + // ((Log4JLogger)ScannerCallable.LOG).getLogger().setLevel(Level.ALL); + UTIL.getConfiguration().set(HConstants.RPC_CODEC_CONF_KEY, + KeyValueCodec.class.getCanonicalName()); + // Disable table on master for now as the feature is broken + // UTIL.getConfiguration().setBoolean(LoadBalancer.TABLES_ON_MASTER, true); + // We used to ask for system tables on Master exclusively but not needed by test and doesn't + // work anyways -- so commented out. + // UTIL.getConfiguration().setBoolean(LoadBalancer.SYSTEM_TABLES_ON_MASTER, true); + UTIL.getConfiguration().set(CoprocessorHost.MASTER_COPROCESSOR_CONF_KEY, + MyMasterObserver.class.getName()); + String queueType = RpcExecutor.CALL_QUEUE_TYPE_READ_STEAL_CONF_VALUE; + UTIL.getConfiguration().set(RpcExecutor.CALL_QUEUE_TYPE_CONF_KEY, queueType); + UTIL.getConfiguration().setFloat(RWQueueRpcExecutor.CALL_QUEUE_READ_SHARE_CONF_KEY, 0.5f); + UTIL.getConfiguration().setFloat(RWQueueRpcExecutor.CALL_QUEUE_SCAN_SHARE_CONF_KEY, 1); + UTIL.startMiniCluster(slaves); + Table t = UTIL.createMultiRegionTable(TEST_TABLE, Bytes.toBytes(FAMILY)); + UTIL.waitTableEnabled(TEST_TABLE); + t.close(); + CONNECTION = ConnectionFactory.createConnection(UTIL.getConfiguration()); + assertTrue(MyMasterObserver.start.get()); + } + + @Test + public void test() throws Exception { + testBatchWithGet(); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScannersFromClientSide2.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScannersFromClientSide2.java index 36e1f4ad50e..477037d5f69 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScannersFromClientSide2.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScannersFromClientSide2.java @@ -52,15 +52,15 @@ public class TestScannersFromClientSide2 { public static final HBaseClassTestRule CLASS_RULE = HBaseClassTestRule.forClass(TestScannersFromClientSide2.class); - private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); + protected static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); - private static TableName TABLE_NAME = TableName.valueOf("scan"); + protected static TableName TABLE_NAME = TableName.valueOf("scan"); - private static byte[] FAMILY = Bytes.toBytes("cf"); + protected static byte[] FAMILY = Bytes.toBytes("cf"); - private static byte[] CQ1 = Bytes.toBytes("cq1"); + protected static byte[] CQ1 = Bytes.toBytes("cq1"); - private static byte[] CQ2 = Bytes.toBytes("cq2"); + protected static byte[] CQ2 = Bytes.toBytes("cq2"); @Parameter(0) public boolean batch; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScannersFromClientSide3.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScannersFromClientSide3.java new file mode 100644 index 00000000000..6ede6c218cd --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScannersFromClientSide3.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import java.util.ArrayList; +import java.util.List; +import org.apache.hadoop.hbase.ipc.RWQueueRpcExecutor; +import org.apache.hadoop.hbase.ipc.RpcExecutor; +import org.apache.hadoop.hbase.testclassification.ClientTests; +import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({ LargeTests.class, ClientTests.class }) +public class TestScannersFromClientSide3 extends TestScannersFromClientSide2 { + + @BeforeClass + public static void setUp() throws Exception { + String queueType = RpcExecutor.CALL_QUEUE_TYPE_READ_STEAL_CONF_VALUE; + TEST_UTIL.getConfiguration().set(RpcExecutor.CALL_QUEUE_TYPE_CONF_KEY, queueType); + TEST_UTIL.getConfiguration().setFloat(RWQueueRpcExecutor.CALL_QUEUE_READ_SHARE_CONF_KEY, 0.5f); + TEST_UTIL.getConfiguration().setFloat(RWQueueRpcExecutor.CALL_QUEUE_SCAN_SHARE_CONF_KEY, 0.5f); + TEST_UTIL.startMiniCluster(3); + byte[][] splitKeys = new byte[8][]; + for (int i = 111; i < 999; i += 111) { + splitKeys[i / 111 - 1] = Bytes.toBytes(String.format("%03d", i)); + } + Table table = TEST_UTIL.createTable(TABLE_NAME, FAMILY, splitKeys); + List puts = new ArrayList<>(); + for (int i = 0; i < 1000; i++) { + puts.add(new Put(Bytes.toBytes(String.format("%03d", i))) + .addColumn(FAMILY, CQ1, Bytes.toBytes(i)).addColumn(FAMILY, CQ2, Bytes.toBytes(i * i))); + } + TEST_UTIL.waitTableAvailable(TABLE_NAME); + table.put(puts); + } + + @Test + public void test() throws Exception { + testScanWithLimit(); + testReversedScanWithLimit(); + testReversedStartRowStopRowInclusive(); + testStartRowStopRowInclusive(); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java index 19aa46a0d62..1a38756c1b9 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java @@ -810,4 +810,21 @@ public class TestSimpleRpcScheduler { } }; } + + @Test + public void testStealReadRWQueue() throws Exception { + String queueType = RpcExecutor.CALL_QUEUE_TYPE_READ_STEAL_CONF_VALUE; + Configuration schedConf = HBaseConfiguration.create(); + schedConf.set(RpcExecutor.CALL_QUEUE_TYPE_CONF_KEY, queueType); + schedConf.setFloat(RWQueueRpcExecutor.CALL_QUEUE_READ_SHARE_CONF_KEY, 0.1f); + + PriorityFunction priority = mock(PriorityFunction.class); + when(priority.getPriority(any(), any(), any())).thenReturn(HConstants.NORMAL_QOS); + SimpleRpcScheduler scheduler = + new SimpleRpcScheduler(schedConf, 0, 0, 0, priority, HConstants.QOS_THRESHOLD); + + Field f = scheduler.getClass().getDeclaredField("callExecutor"); + f.setAccessible(true); + assertTrue(f.get(scheduler) instanceof StealReadJobRWQueueRpcExecutor); + } }