HBASE-27766 Support steal job queue mode for read RPC queues of RWQueueRpcExecutor

This commit is contained in:
haxiaolin 2023-05-30 17:33:50 +08:00
parent 79c985f246
commit db1cc1d6e3
11 changed files with 306 additions and 21 deletions

View File

@ -58,9 +58,9 @@ public class RWQueueRpcExecutor extends RpcExecutor {
private final int writeHandlersCount; private final int writeHandlersCount;
private final int readHandlersCount; private final int readHandlersCount;
private final int scanHandlersCount; private final int scanHandlersCount;
private final int numWriteQueues; protected final int numWriteQueues;
private final int numReadQueues; protected final int numReadQueues;
private final int numScanQueues; protected final int numScanQueues;
private final AtomicInteger activeWriteHandlerCount = new AtomicInteger(0); private final AtomicInteger activeWriteHandlerCount = new AtomicInteger(0);
private final AtomicInteger activeReadHandlerCount = new AtomicInteger(0); private final AtomicInteger activeReadHandlerCount = new AtomicInteger(0);
@ -97,9 +97,7 @@ public class RWQueueRpcExecutor extends RpcExecutor {
numScanQueues = scanQueues; numScanQueues = scanQueues;
scanHandlersCount = scanHandlers; scanHandlersCount = scanHandlers;
initializeQueues(numWriteQueues); initQueues();
initializeQueues(numReadQueues);
initializeQueues(numScanQueues);
this.writeBalancer = getBalancer(name, conf, queues.subList(0, numWriteQueues)); this.writeBalancer = getBalancer(name, conf, queues.subList(0, numWriteQueues));
this.readBalancer = this.readBalancer =
@ -115,6 +113,12 @@ public class RWQueueRpcExecutor extends RpcExecutor {
+ numScanQueues + " scanHandlers=" + scanHandlersCount); + numScanQueues + " scanHandlers=" + scanHandlersCount);
} }
protected void initQueues() {
initializeQueues(numWriteQueues);
initializeQueues(numReadQueues);
initializeQueues(numScanQueues);
}
@Override @Override
protected int computeNumCallQueues(final int handlerCount, final float callQueuesHandlersFactor) { protected int computeNumCallQueues(final int handlerCount, final float callQueuesHandlersFactor) {
// at least 1 read queue and 1 write queue // at least 1 read queue and 1 write queue

View File

@ -72,6 +72,7 @@ public abstract class RpcExecutor {
public static final String CALL_QUEUE_TYPE_CODEL_CONF_VALUE = "codel"; public static final String CALL_QUEUE_TYPE_CODEL_CONF_VALUE = "codel";
public static final String CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE = "deadline"; public static final String CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE = "deadline";
public static final String CALL_QUEUE_TYPE_FIFO_CONF_VALUE = "fifo"; public static final String CALL_QUEUE_TYPE_FIFO_CONF_VALUE = "fifo";
public static final String CALL_QUEUE_TYPE_READ_STEAL_CONF_VALUE = "readSteal";
public static final String CALL_QUEUE_TYPE_PLUGGABLE_CONF_VALUE = "pluggable"; public static final String CALL_QUEUE_TYPE_PLUGGABLE_CONF_VALUE = "pluggable";
public static final String CALL_QUEUE_TYPE_CONF_KEY = "hbase.ipc.server.callqueue.type"; public static final String CALL_QUEUE_TYPE_CONF_KEY = "hbase.ipc.server.callqueue.type";
public static final String CALL_QUEUE_TYPE_CONF_DEFAULT = CALL_QUEUE_TYPE_FIFO_CONF_VALUE; public static final String CALL_QUEUE_TYPE_CONF_DEFAULT = CALL_QUEUE_TYPE_FIFO_CONF_VALUE;
@ -101,7 +102,7 @@ public abstract class RpcExecutor {
private final LongAdder numLifoModeSwitches = new LongAdder(); private final LongAdder numLifoModeSwitches = new LongAdder();
protected final int numCallQueues; protected final int numCallQueues;
protected final List<BlockingQueue<CallRunner>> queues; protected List<BlockingQueue<CallRunner>> queues;
private final Class<? extends BlockingQueue> queueClass; private final Class<? extends BlockingQueue> queueClass;
private final Object[] queueInitArgs; private final Object[] queueInitArgs;
@ -117,6 +118,8 @@ public abstract class RpcExecutor {
private final Configuration conf; private final Configuration conf;
private final Abortable abortable; private final Abortable abortable;
protected int maxQueueLength;
public RpcExecutor(final String name, final int handlerCount, final int maxQueueLength, public RpcExecutor(final String name, final int handlerCount, final int maxQueueLength,
final PriorityFunction priority, final Configuration conf, final Abortable abortable) { final PriorityFunction priority, final Configuration conf, final Abortable abortable) {
this(name, handlerCount, conf.get(CALL_QUEUE_TYPE_CONF_KEY, CALL_QUEUE_TYPE_CONF_DEFAULT), this(name, handlerCount, conf.get(CALL_QUEUE_TYPE_CONF_KEY, CALL_QUEUE_TYPE_CONF_DEFAULT),
@ -129,6 +132,7 @@ public abstract class RpcExecutor {
this.name = Strings.nullToEmpty(name); this.name = Strings.nullToEmpty(name);
this.conf = conf; this.conf = conf;
this.abortable = abortable; this.abortable = abortable;
this.maxQueueLength = maxQueueLength;
float callQueuesHandlersFactor = this.conf.getFloat(CALL_QUEUE_HANDLER_FACTOR_CONF_KEY, 0.1f); float callQueuesHandlersFactor = this.conf.getFloat(CALL_QUEUE_HANDLER_FACTOR_CONF_KEY, 0.1f);
if ( if (

View File

@ -17,6 +17,8 @@
*/ */
package org.apache.hadoop.hbase.ipc; package org.apache.hadoop.hbase.ipc;
import static org.apache.hadoop.hbase.ipc.RpcExecutor.CALL_QUEUE_TYPE_READ_STEAL_CONF_VALUE;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Abortable; import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.HBaseInterfaceAudience;
@ -86,9 +88,14 @@ public class SimpleRpcScheduler extends RpcScheduler implements ConfigurationObs
float callqReadShare = conf.getFloat(RWQueueRpcExecutor.CALL_QUEUE_READ_SHARE_CONF_KEY, 0); float callqReadShare = conf.getFloat(RWQueueRpcExecutor.CALL_QUEUE_READ_SHARE_CONF_KEY, 0);
if (callqReadShare > 0) { if (callqReadShare > 0) {
// at least 1 read handler and 1 write handler if (callQueueType.equals(CALL_QUEUE_TYPE_READ_STEAL_CONF_VALUE)) {
callExecutor = new FastPathRWQueueRpcExecutor("default.FPRWQ", Math.max(2, handlerCount), callExecutor = new StealReadJobRWQueueRpcExecutor("default.SRRWQ",
maxQueueLength, priority, conf, server); Math.max(2, handlerCount), maxQueueLength, priority, conf, server);
} else {
// at least 1 read handler and 1 write handler
callExecutor = new FastPathRWQueueRpcExecutor("default.FPRWQ", Math.max(2, handlerCount),
maxQueueLength, priority, conf, server);
}
} else { } else {
if ( if (
RpcExecutor.isFifoQueueType(callQueueType) || RpcExecutor.isCodelQueueType(callQueueType) RpcExecutor.isFifoQueueType(callQueueType) || RpcExecutor.isCodelQueueType(callQueueType)

View File

@ -0,0 +1,65 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.ipc;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@InterfaceAudience.LimitedPrivate({ HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX })
@InterfaceStability.Evolving
public class StealReadJobRWQueueRpcExecutor extends RWQueueRpcExecutor {
private static final Logger LOG = LoggerFactory.getLogger(StealReadJobRWQueueRpcExecutor.class);
public StealReadJobRWQueueRpcExecutor(String name, int handlerCount, int maxQueueLength,
PriorityFunction priority, Configuration conf, Abortable abortable) {
super(name, handlerCount, maxQueueLength, priority, conf, abortable);
}
@Override
public void initQueues() {
queues = new ArrayList<>(this.numWriteQueues + this.numReadQueues + numScanQueues);
initializeQueues(numWriteQueues);
if (numReadQueues > 0 && numScanQueues > 0) {
int stealQueueCount = Math.min(numReadQueues, numScanQueues);
List<BlockingQueue<CallRunner>> stealScanQueues = new ArrayList<>(stealQueueCount);
for (int i = 0; i < stealQueueCount; i++) {
StealRpcJobQueue scanQueue = new StealRpcJobQueue(maxQueueLength, maxQueueLength);
BlockingQueue<CallRunner> readQueue = scanQueue.getStealFromQueue();
queues.add(readQueue);
stealScanQueues.add(scanQueue);
}
if (numReadQueues > numScanQueues) {
initializeQueues(numReadQueues - numScanQueues);
}
queues.addAll(stealScanQueues);
if (numScanQueues > numReadQueues) {
initializeQueues(numScanQueues - numReadQueues);
}
} else {
initializeQueues(Math.max(numReadQueues, numScanQueues));
}
}
}

View File

@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.ipc;
import java.util.Comparator;
import org.apache.hadoop.hbase.util.StealJobQueue;
import org.apache.yetus.audience.InterfaceAudience;
@InterfaceAudience.Private
public class StealRpcJobQueue extends StealJobQueue<CallRunner> {
public final static RpcComparator RPCCOMPARATOR = new RpcComparator();
public StealRpcJobQueue(int initCapacity, int stealFromQueueInitCapacity) {
super(initCapacity, stealFromQueueInitCapacity, RPCCOMPARATOR);
}
public static class RpcComparator implements Comparator<CallRunner> {
public RpcComparator() {
super();
}
@Override
public int compare(CallRunner o1, CallRunner o2) {
long diff = o1.getRpcCall().getReceiveTime() - o2.getRpcCall().getReceiveTime();
if (diff > 0) {
return 1;
} else if (diff < 0) {
return -1;
} else {
return 0;
}
}
}
}

View File

@ -49,6 +49,10 @@ public class StealJobQueue<T> extends PriorityBlockingQueue<T> {
this(11, 11, comparator); this(11, 11, comparator);
} }
public StealJobQueue(int initCapacity, int stealFromQueueInitCapacity) {
this(initCapacity, stealFromQueueInitCapacity, null);
}
public StealJobQueue(int initCapacity, int stealFromQueueInitCapacity, public StealJobQueue(int initCapacity, int stealFromQueueInitCapacity,
Comparator<? super T> comparator) { Comparator<? super T> comparator) {
super(initCapacity, comparator); super(initCapacity, comparator);

View File

@ -65,17 +65,17 @@ public class TestMultiParallel {
private static final Logger LOG = LoggerFactory.getLogger(TestMultiParallel.class); private static final Logger LOG = LoggerFactory.getLogger(TestMultiParallel.class);
private static final HBaseTestingUtil UTIL = new HBaseTestingUtil(); protected static final HBaseTestingUtil UTIL = new HBaseTestingUtil();
private static final byte[] VALUE = Bytes.toBytes("value"); private static final byte[] VALUE = Bytes.toBytes("value");
private static final byte[] QUALIFIER = Bytes.toBytes("qual"); private static final byte[] QUALIFIER = Bytes.toBytes("qual");
private static final String FAMILY = "family"; protected static final String FAMILY = "family";
private static final TableName TEST_TABLE = TableName.valueOf("multi_test_table"); protected static final TableName TEST_TABLE = TableName.valueOf("multi_test_table");
private static final byte[] BYTES_FAMILY = Bytes.toBytes(FAMILY); private static final byte[] BYTES_FAMILY = Bytes.toBytes(FAMILY);
private static final byte[] ONE_ROW = Bytes.toBytes("xxx"); private static final byte[] ONE_ROW = Bytes.toBytes("xxx");
private static final byte[][] KEYS = makeKeys(); private static final byte[][] KEYS = makeKeys();
private static final int slaves = 5; // also used for testing HTable pool size protected static final int slaves = 5; // also used for testing HTable pool size
private static Connection CONNECTION; protected static Connection CONNECTION;
@BeforeClass @BeforeClass
public static void beforeClass() throws Exception { public static void beforeClass() throws Exception {
@ -662,7 +662,7 @@ public class TestMultiParallel {
public static class MyMasterObserver implements MasterObserver, MasterCoprocessor { public static class MyMasterObserver implements MasterObserver, MasterCoprocessor {
private static final AtomicInteger postBalanceCount = new AtomicInteger(0); private static final AtomicInteger postBalanceCount = new AtomicInteger(0);
private static final AtomicBoolean start = new AtomicBoolean(false); protected static final AtomicBoolean start = new AtomicBoolean(false);
@Override @Override
public void start(CoprocessorEnvironment env) throws IOException { public void start(CoprocessorEnvironment env) throws IOException {

View File

@ -0,0 +1,72 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;
import static org.junit.Assert.assertTrue;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.codec.KeyValueCodec;
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
import org.apache.hadoop.hbase.ipc.RWQueueRpcExecutor;
import org.apache.hadoop.hbase.ipc.RpcExecutor;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@Category({ MediumTests.class })
public class TestMultiParallel2 extends TestMultiParallel {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestMultiParallel2.class);
public static void beforeClass() throws Exception {
// Uncomment the following lines if more verbosity is needed for
// debugging (see HBASE-12285 for details).
// ((Log4JLogger)RpcServer.LOG).getLogger().setLevel(Level.ALL);
// ((Log4JLogger)RpcClient.LOG).getLogger().setLevel(Level.ALL);
// ((Log4JLogger)ScannerCallable.LOG).getLogger().setLevel(Level.ALL);
UTIL.getConfiguration().set(HConstants.RPC_CODEC_CONF_KEY,
KeyValueCodec.class.getCanonicalName());
// Disable table on master for now as the feature is broken
// UTIL.getConfiguration().setBoolean(LoadBalancer.TABLES_ON_MASTER, true);
// We used to ask for system tables on Master exclusively but not needed by test and doesn't
// work anyways -- so commented out.
// UTIL.getConfiguration().setBoolean(LoadBalancer.SYSTEM_TABLES_ON_MASTER, true);
UTIL.getConfiguration().set(CoprocessorHost.MASTER_COPROCESSOR_CONF_KEY,
MyMasterObserver.class.getName());
String queueType = RpcExecutor.CALL_QUEUE_TYPE_READ_STEAL_CONF_VALUE;
UTIL.getConfiguration().set(RpcExecutor.CALL_QUEUE_TYPE_CONF_KEY, queueType);
UTIL.getConfiguration().setFloat(RWQueueRpcExecutor.CALL_QUEUE_READ_SHARE_CONF_KEY, 0.5f);
UTIL.getConfiguration().setFloat(RWQueueRpcExecutor.CALL_QUEUE_SCAN_SHARE_CONF_KEY, 1);
UTIL.startMiniCluster(slaves);
Table t = UTIL.createMultiRegionTable(TEST_TABLE, Bytes.toBytes(FAMILY));
UTIL.waitTableEnabled(TEST_TABLE);
t.close();
CONNECTION = ConnectionFactory.createConnection(UTIL.getConfiguration());
assertTrue(MyMasterObserver.start.get());
}
@Test
public void test() throws Exception {
testBatchWithGet();
}
}

View File

@ -52,15 +52,15 @@ public class TestScannersFromClientSide2 {
public static final HBaseClassTestRule CLASS_RULE = public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestScannersFromClientSide2.class); HBaseClassTestRule.forClass(TestScannersFromClientSide2.class);
private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); protected static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
private static TableName TABLE_NAME = TableName.valueOf("scan"); protected static TableName TABLE_NAME = TableName.valueOf("scan");
private static byte[] FAMILY = Bytes.toBytes("cf"); protected static byte[] FAMILY = Bytes.toBytes("cf");
private static byte[] CQ1 = Bytes.toBytes("cq1"); protected static byte[] CQ1 = Bytes.toBytes("cq1");
private static byte[] CQ2 = Bytes.toBytes("cq2"); protected static byte[] CQ2 = Bytes.toBytes("cq2");
@Parameter(0) @Parameter(0)
public boolean batch; public boolean batch;

View File

@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.hbase.ipc.RWQueueRpcExecutor;
import org.apache.hadoop.hbase.ipc.RpcExecutor;
import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@Category({ LargeTests.class, ClientTests.class })
public class TestScannersFromClientSide3 extends TestScannersFromClientSide2 {
@BeforeClass
public static void setUp() throws Exception {
String queueType = RpcExecutor.CALL_QUEUE_TYPE_READ_STEAL_CONF_VALUE;
TEST_UTIL.getConfiguration().set(RpcExecutor.CALL_QUEUE_TYPE_CONF_KEY, queueType);
TEST_UTIL.getConfiguration().setFloat(RWQueueRpcExecutor.CALL_QUEUE_READ_SHARE_CONF_KEY, 0.5f);
TEST_UTIL.getConfiguration().setFloat(RWQueueRpcExecutor.CALL_QUEUE_SCAN_SHARE_CONF_KEY, 0.5f);
TEST_UTIL.startMiniCluster(3);
byte[][] splitKeys = new byte[8][];
for (int i = 111; i < 999; i += 111) {
splitKeys[i / 111 - 1] = Bytes.toBytes(String.format("%03d", i));
}
Table table = TEST_UTIL.createTable(TABLE_NAME, FAMILY, splitKeys);
List<Put> puts = new ArrayList<>();
for (int i = 0; i < 1000; i++) {
puts.add(new Put(Bytes.toBytes(String.format("%03d", i)))
.addColumn(FAMILY, CQ1, Bytes.toBytes(i)).addColumn(FAMILY, CQ2, Bytes.toBytes(i * i)));
}
TEST_UTIL.waitTableAvailable(TABLE_NAME);
table.put(puts);
}
@Test
public void test() throws Exception {
testScanWithLimit();
testReversedScanWithLimit();
testReversedStartRowStopRowInclusive();
testStartRowStopRowInclusive();
}
}

View File

@ -810,4 +810,21 @@ public class TestSimpleRpcScheduler {
} }
}; };
} }
@Test
public void testStealReadRWQueue() throws Exception {
String queueType = RpcExecutor.CALL_QUEUE_TYPE_READ_STEAL_CONF_VALUE;
Configuration schedConf = HBaseConfiguration.create();
schedConf.set(RpcExecutor.CALL_QUEUE_TYPE_CONF_KEY, queueType);
schedConf.setFloat(RWQueueRpcExecutor.CALL_QUEUE_READ_SHARE_CONF_KEY, 0.1f);
PriorityFunction priority = mock(PriorityFunction.class);
when(priority.getPriority(any(), any(), any())).thenReturn(HConstants.NORMAL_QOS);
SimpleRpcScheduler scheduler =
new SimpleRpcScheduler(schedConf, 0, 0, 0, priority, HConstants.QOS_THRESHOLD);
Field f = scheduler.getClass().getDeclaredField("callExecutor");
f.setAccessible(true);
assertTrue(f.get(scheduler) instanceof StealReadJobRWQueueRpcExecutor);
}
} }