HBASE-16388 Prevent client threads being blocked by only one slow region server

Signed-off-by: stack <stack@apache.org>
This commit is contained in:
Phil Yang 2016-09-14 13:21:01 +08:00 committed by stack
parent 8540171a45
commit 8ef6c76344
5 changed files with 201 additions and 6 deletions

View File

@ -23,6 +23,9 @@ import static org.apache.hadoop.hbase.ipc.IPCUtil.wrapException;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.protobuf.BlockingRpcChannel;
import com.google.protobuf.Descriptors;
import com.google.protobuf.Descriptors.MethodDescriptor;
@ -137,6 +140,16 @@ public abstract class AbstractRpcClient<T extends RpcConnection> implements RpcC
private final ScheduledFuture<?> cleanupIdleConnectionTask;
private int maxConcurrentCallsPerServer;
private static final LoadingCache<InetSocketAddress, AtomicInteger> concurrentCounterCache =
CacheBuilder.newBuilder().expireAfterAccess(1, TimeUnit.HOURS).
build(new CacheLoader<InetSocketAddress, AtomicInteger>() {
@Override public AtomicInteger load(InetSocketAddress key) throws Exception {
return new AtomicInteger(0);
}
});
/**
* Construct an IPC client for the cluster <code>clusterId</code>
* @param conf configuration
@ -167,6 +180,9 @@ public abstract class AbstractRpcClient<T extends RpcConnection> implements RpcC
this.readTO = conf.getInt(SOCKET_TIMEOUT_READ, DEFAULT_SOCKET_TIMEOUT_READ);
this.writeTO = conf.getInt(SOCKET_TIMEOUT_WRITE, DEFAULT_SOCKET_TIMEOUT_WRITE);
this.metrics = metrics;
this.maxConcurrentCallsPerServer = conf.getInt(
HConstants.HBASE_CLIENT_PERSERVER_REQUESTS_THRESHOLD,
HConstants.DEFAULT_HBASE_CLIENT_PERSERVER_REQUESTS_THRESHOLD);
this.connections = new PoolMap<>(getPoolType(conf), getPoolSize(conf));
@ -382,16 +398,22 @@ public abstract class AbstractRpcClient<T extends RpcConnection> implements RpcC
final RpcCallback<Message> callback) {
final MetricsConnection.CallStats cs = MetricsConnection.newCallStats();
cs.setStartTime(EnvironmentEdgeManager.currentTime());
final AtomicInteger counter = concurrentCounterCache.getUnchecked(addr);
Call call = new Call(nextCallId(), md, param, hrc.cellScanner(), returnType,
hrc.getCallTimeout(), hrc.getPriority(), new RpcCallback<Call>() {
@Override
public void run(Call call) {
counter.decrementAndGet();
onCallFinished(call, hrc, addr, callback);
}
}, cs);
ConnectionId remoteId = new ConnectionId(ticket, md.getService().getName(), addr);
int count = counter.incrementAndGet();
try {
if (count > maxConcurrentCallsPerServer) {
throw new ServerTooBusyException(addr, count);
}
T connection = getConnection(remoteId);
connection.sendRequest(call, hrc);
} catch (Exception e) {

View File

@ -0,0 +1,38 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.ipc;
import java.net.InetSocketAddress;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
/**
* Throw this in rpc call if there are too many pending requests for one region server
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
public class ServerTooBusyException extends DoNotRetryIOException {
public ServerTooBusyException(InetSocketAddress address, long count) {
super("There are " + count + " concurrent rpc requests for " + address);
}
}

View File

@ -737,6 +737,18 @@ public final class HConstants {
*/
public static final int DEFAULT_HBASE_CLIENT_MAX_PERREGION_TASKS = 1;
/**
* The maximum number of concurrent pending RPC requests for one server in process level.
*/
public static final String HBASE_CLIENT_PERSERVER_REQUESTS_THRESHOLD =
"hbase.client.perserver.requests.threshold";
/**
* Default value of {@link #HBASE_CLIENT_PERSERVER_REQUESTS_THRESHOLD}.
*/
public static final int DEFAULT_HBASE_CLIENT_PERSERVER_REQUESTS_THRESHOLD = Integer.MAX_VALUE;
/**
* Parameter name for server pause value, used mostly as value to wait before
* running a retry of a failed operation.

View File

@ -487,23 +487,33 @@ possible configurations would overwhelm and obscure the important.
<property>
<name>hbase.client.max.total.tasks</name>
<value>100</value>
<description>The maximum number of concurrent tasks a single HTable instance will
<description>The maximum number of concurrent mutation tasks a single HTable instance will
send to the cluster.</description>
</property>
<property>
<name>hbase.client.max.perserver.tasks</name>
<value>5</value>
<description>The maximum number of concurrent tasks a single HTable instance will
<description>The maximum number of concurrent mutation tasks a single HTable instance will
send to a single region server.</description>
</property>
<property>
<name>hbase.client.max.perregion.tasks</name>
<value>1</value>
<description>The maximum number of concurrent connections the client will
<description>The maximum number of concurrent mutation tasks the client will
maintain to a single Region. That is, if there is already
hbase.client.max.perregion.tasks writes in progress for this region, new puts
won't be sent to this region until some writes finishes.</description>
</property>
<property>
<name>hbase.client.perserver.requests.threshold</name>
<value>2147483647</value>
<description>The max number of concurrent pending requests for one server in all client threads
(process level). Exceeding requests will be thrown ServerTooBusyException immediately to prevent
user's threads being occupied and blocked by only one slow region server. If you use a fix
number of threads to access HBase in a synchronous way, set this to a suitable value which is
related to the number of threads will help you. See
https://issues.apache.org/jira/browse/HBASE-16388 for details.</description>
</property>
<property>
<name>hbase.client.scanner.caching</name>
<value>2147483647</value>

View File

@ -18,7 +18,6 @@
*/
package org.apache.hadoop.hbase.client;
import static org.junit.Assert.*;
import com.google.common.collect.Lists;
import java.io.IOException;
@ -61,10 +60,12 @@ import org.apache.hadoop.hbase.filter.FilterBase;
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
import org.apache.hadoop.hbase.ipc.RpcClient;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
import org.apache.hadoop.hbase.ipc.ServerTooBusyException;
import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
@ -85,8 +86,7 @@ import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import com.google.common.collect.Lists;
import com.google.protobuf.RpcController;
import static org.junit.Assert.fail;
/**
* This class is for testing HBaseConnectionManager features
@ -150,6 +150,12 @@ public class TestHCM {
final Get get, final List<Cell> results) throws IOException {
Threads.sleep(SLEEP_TIME);
}
@Override
public void prePut(final ObserverContext<RegionCoprocessorEnvironment> e,
final Put put, final WALEdit edit, final Durability durability) throws IOException {
Threads.sleep(SLEEP_TIME);
}
}
public static class SleepWriteCoprocessor extends BaseRegionObserver {
@ -187,6 +193,8 @@ public class TestHCM {
TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, RPC_RETRY);
// simulate queue blocking in testDropTimeoutRequest
TEST_UTIL.getConfiguration().setInt(HConstants.REGION_SERVER_HANDLER_COUNT, 1);
// Used in testServerBusyException
TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_PERSERVER_REQUESTS_THRESHOLD, 3);
TEST_UTIL.startMiniCluster(2);
}
@ -1338,4 +1346,109 @@ public class TestHCM {
table.close();
connection.close();
}
private class TestPutThread extends Thread {
Table table;
int getServerBusyException = 0;
TestPutThread(Table table){
this.table = table;
}
@Override
public void run() {
try {
Put p = new Put(ROW);
p.addColumn(FAM_NAM, new byte[]{0}, new byte[]{0});
table.put(p);
} catch (RetriesExhaustedWithDetailsException e) {
if (e.exceptions.get(0).getCause() instanceof ServerTooBusyException) {
getServerBusyException = 1;
}
} catch (IOException ignore) {
}
}
}
private class TestGetThread extends Thread {
Table table;
int getServerBusyException = 0;
TestGetThread(Table table){
this.table = table;
}
@Override
public void run() {
try {
Get g = new Get(ROW);
g.addColumn(FAM_NAM, new byte[]{0});
table.get(g);
} catch (RetriesExhaustedException e) {
if (e.getCause().getCause() instanceof ServerTooBusyException) {
getServerBusyException = 1;
}
} catch (IOException ignore) {
}
}
}
@Test()
public void testServerBusyException() throws Exception {
HTableDescriptor hdt = TEST_UTIL.createTableDescriptor("HCM-testServerBusy");
hdt.addCoprocessor(SleepCoprocessor.class.getName());
Configuration c = new Configuration(TEST_UTIL.getConfiguration());
TEST_UTIL.createTable(hdt, new byte[][] { FAM_NAM }, c);
TestGetThread tg1 =
new TestGetThread(TEST_UTIL.getConnection().getTable(hdt.getTableName()));
TestGetThread tg2 =
new TestGetThread(TEST_UTIL.getConnection().getTable(hdt.getTableName()));
TestGetThread tg3 =
new TestGetThread(TEST_UTIL.getConnection().getTable(hdt.getTableName()));
TestGetThread tg4 =
new TestGetThread(TEST_UTIL.getConnection().getTable(hdt.getTableName()));
TestGetThread tg5 =
new TestGetThread(TEST_UTIL.getConnection().getTable(hdt.getTableName()));
tg1.start();
tg2.start();
tg3.start();
tg4.start();
tg5.start();
tg1.join();
tg2.join();
tg3.join();
tg4.join();
tg5.join();
assertEquals(2,
tg1.getServerBusyException + tg2.getServerBusyException + tg3.getServerBusyException
+ tg4.getServerBusyException + tg5.getServerBusyException);
// Put has its own logic in HTable, test Put alone. We use AsyncProcess for Put (use multi at
// RPC level) and it wrap exceptions to RetriesExhaustedWithDetailsException.
TestPutThread tp1 =
new TestPutThread(TEST_UTIL.getConnection().getTable(hdt.getTableName()));
TestPutThread tp2 =
new TestPutThread(TEST_UTIL.getConnection().getTable(hdt.getTableName()));
TestPutThread tp3 =
new TestPutThread(TEST_UTIL.getConnection().getTable(hdt.getTableName()));
TestPutThread tp4 =
new TestPutThread(TEST_UTIL.getConnection().getTable(hdt.getTableName()));
TestPutThread tp5 =
new TestPutThread(TEST_UTIL.getConnection().getTable(hdt.getTableName()));
tp1.start();
tp2.start();
tp3.start();
tp4.start();
tp5.start();
tp1.join();
tp2.join();
tp3.join();
tp4.join();
tp5.join();
assertEquals(2,
tp1.getServerBusyException + tp2.getServerBusyException + tp3.getServerBusyException
+ tp4.getServerBusyException + tp5.getServerBusyException);
}
}