HBASE-16388 Prevent client threads being blocked by only one slow region server

Signed-off-by: stack <stack@apache.org>
This commit is contained in:
Phil Yang 2016-09-14 14:02:34 +08:00 committed by stack
parent ac1ee77f40
commit 069d1f73fa
5 changed files with 194 additions and 4 deletions

View File

@ -19,6 +19,9 @@
package org.apache.hadoop.hbase.ipc; package org.apache.hadoop.hbase.ipc;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.protobuf.BlockingRpcChannel; import com.google.protobuf.BlockingRpcChannel;
import com.google.protobuf.Descriptors; import com.google.protobuf.Descriptors;
import com.google.protobuf.Message; import com.google.protobuf.Message;
@ -31,6 +34,8 @@ import java.net.InetSocketAddress;
import java.net.SocketAddress; import java.net.SocketAddress;
import java.net.SocketTimeoutException; import java.net.SocketTimeoutException;
import java.net.UnknownHostException; import java.net.UnknownHostException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -80,6 +85,16 @@ public abstract class AbstractRpcClient implements RpcClient {
protected final int readTO; protected final int readTO;
protected final int writeTO; protected final int writeTO;
private int maxConcurrentCallsPerServer;
private static final LoadingCache<InetSocketAddress, AtomicInteger> concurrentCounterCache =
CacheBuilder.newBuilder().expireAfterAccess(1, TimeUnit.HOURS).
build(new CacheLoader<InetSocketAddress, AtomicInteger>() {
@Override public AtomicInteger load(InetSocketAddress key) throws Exception {
return new AtomicInteger(0);
}
});
/** /**
* Construct an IPC client for the cluster <code>clusterId</code> * Construct an IPC client for the cluster <code>clusterId</code>
* *
@ -110,6 +125,9 @@ public abstract class AbstractRpcClient implements RpcClient {
this.readTO = conf.getInt(SOCKET_TIMEOUT_READ, DEFAULT_SOCKET_TIMEOUT_READ); this.readTO = conf.getInt(SOCKET_TIMEOUT_READ, DEFAULT_SOCKET_TIMEOUT_READ);
this.writeTO = conf.getInt(SOCKET_TIMEOUT_WRITE, DEFAULT_SOCKET_TIMEOUT_WRITE); this.writeTO = conf.getInt(SOCKET_TIMEOUT_WRITE, DEFAULT_SOCKET_TIMEOUT_WRITE);
this.metrics = metrics; this.metrics = metrics;
this.maxConcurrentCallsPerServer = conf.getInt(
HConstants.HBASE_CLIENT_PERSERVER_REQUESTS_THRESHOLD,
HConstants.DEFAULT_HBASE_CLIENT_PERSERVER_REQUESTS_THRESHOLD);
// login the server principal (if using secure Hadoop) // login the server principal (if using secure Hadoop)
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
@ -221,7 +239,12 @@ public abstract class AbstractRpcClient implements RpcClient {
} }
Pair<Message, CellScanner> val; Pair<Message, CellScanner> val;
AtomicInteger counter = concurrentCounterCache.getUnchecked(isa);
int count = counter.incrementAndGet();
try { try {
if (count > maxConcurrentCallsPerServer) {
throw new ServerTooBusyException(isa, count);
}
final MetricsConnection.CallStats cs = MetricsConnection.newCallStats(); final MetricsConnection.CallStats cs = MetricsConnection.newCallStats();
cs.setStartTime(EnvironmentEdgeManager.currentTime()); cs.setStartTime(EnvironmentEdgeManager.currentTime());
val = call(pcrc, md, param, returnType, ticket, isa, cs); val = call(pcrc, md, param, returnType, ticket, isa, cs);
@ -238,6 +261,8 @@ public abstract class AbstractRpcClient implements RpcClient {
return val.getFirst(); return val.getFirst();
} catch (Throwable e) { } catch (Throwable e) {
throw new ServiceException(e); throw new ServiceException(e);
} finally {
counter.decrementAndGet();
} }
} }

View File

@ -0,0 +1,38 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.ipc;
import java.net.InetSocketAddress;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
/**
* Throw this in rpc call if there are too many pending requests for one region server
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
public class ServerTooBusyException extends DoNotRetryIOException {
public ServerTooBusyException(InetSocketAddress address, long count){
super("There are "+count+" concurrent rpc requests for "+address);
}
}

View File

@ -722,6 +722,18 @@ public final class HConstants {
*/ */
public static final int DEFAULT_HBASE_CLIENT_MAX_PERREGION_TASKS = 1; public static final int DEFAULT_HBASE_CLIENT_MAX_PERREGION_TASKS = 1;
/**
* The maximum number of concurrent pending RPC requests for one server in process level.
*/
public static final String HBASE_CLIENT_PERSERVER_REQUESTS_THRESHOLD =
"hbase.client.perserver.requests.threshold";
/**
* Default value of {@link #HBASE_CLIENT_PERSERVER_REQUESTS_THRESHOLD}.
*/
public static final int DEFAULT_HBASE_CLIENT_PERSERVER_REQUESTS_THRESHOLD = Integer.MAX_VALUE;
/** /**
* Parameter name for server pause value, used mostly as value to wait before * Parameter name for server pause value, used mostly as value to wait before
* running a retry of a failed operation. * running a retry of a failed operation.

View File

@ -493,23 +493,33 @@ possible configurations would overwhelm and obscure the important.
<property> <property>
<name>hbase.client.max.total.tasks</name> <name>hbase.client.max.total.tasks</name>
<value>100</value> <value>100</value>
<description>The maximum number of concurrent tasks a single HTable instance will <description>The maximum number of concurrent mutation tasks a single HTable instance will
send to the cluster.</description> send to the cluster.</description>
</property> </property>
<property> <property>
<name>hbase.client.max.perserver.tasks</name> <name>hbase.client.max.perserver.tasks</name>
<value>5</value> <value>5</value>
<description>The maximum number of concurrent tasks a single HTable instance will <description>The maximum number of concurrent mutation tasks a single HTable instance will
send to a single region server.</description> send to a single region server.</description>
</property> </property>
<property> <property>
<name>hbase.client.max.perregion.tasks</name> <name>hbase.client.max.perregion.tasks</name>
<value>1</value> <value>1</value>
<description>The maximum number of concurrent connections the client will <description>The maximum number of concurrent mutation tasks the client will
maintain to a single Region. That is, if there is already maintain to a single Region. That is, if there is already
hbase.client.max.perregion.tasks writes in progress for this region, new puts hbase.client.max.perregion.tasks writes in progress for this region, new puts
won't be sent to this region until some writes finishes.</description> won't be sent to this region until some writes finishes.</description>
</property> </property>
<property>
<name>hbase.client.perserver.requests.threshold</name>
<value>2147483647</value>
<description>The max number of concurrent pending requests for one server in all client threads
(process level). Exceeding requests will be thrown ServerTooBusyException immediately to prevent
user's threads being occupied and blocked by only one slow region server. If you use a fix
number of threads to access HBase in a synchronous way, set this to a suitable value which is
related to the number of threads will help you. See
https://issues.apache.org/jira/browse/HBASE-16388 for details.</description>
</property>
<property> <property>
<name>hbase.client.scanner.caching</name> <name>hbase.client.scanner.caching</name>
<value>2147483647</value> <value>2147483647</value>

View File

@ -18,7 +18,6 @@
*/ */
package org.apache.hadoop.hbase.client; package org.apache.hadoop.hbase.client;
import static org.junit.Assert.*;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import java.io.IOException; import java.io.IOException;
@ -64,10 +63,12 @@ import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.FilterBase; import org.apache.hadoop.hbase.filter.FilterBase;
import org.apache.hadoop.hbase.ipc.RpcClient; import org.apache.hadoop.hbase.ipc.RpcClient;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory; import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
import org.apache.hadoop.hbase.ipc.ServerTooBusyException;
import org.apache.hadoop.hbase.master.HMaster; import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.Region; import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException; import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
@ -155,6 +156,12 @@ public class TestHCM {
final Get get, final List<Cell> results) throws IOException { final Get get, final List<Cell> results) throws IOException {
Threads.sleep(SLEEP_TIME); Threads.sleep(SLEEP_TIME);
} }
@Override
public void prePut(final ObserverContext<RegionCoprocessorEnvironment> e,
final Put put, final WALEdit edit, final Durability durability) throws IOException {
Threads.sleep(SLEEP_TIME);
}
} }
public static class SleepWriteCoprocessor extends BaseRegionObserver { public static class SleepWriteCoprocessor extends BaseRegionObserver {
@ -191,8 +198,13 @@ public class TestHCM {
// Up the handlers; this test needs more than usual. // Up the handlers; this test needs more than usual.
TEST_UTIL.getConfiguration().setInt(HConstants.REGION_SERVER_HIGH_PRIORITY_HANDLER_COUNT, 10); TEST_UTIL.getConfiguration().setInt(HConstants.REGION_SERVER_HIGH_PRIORITY_HANDLER_COUNT, 10);
TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, RPC_RETRY); TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, RPC_RETRY);
// simulate queue blocking in testDropTimeoutRequest // simulate queue blocking in testDropTimeoutRequest
TEST_UTIL.getConfiguration().setInt(HConstants.REGION_SERVER_HANDLER_COUNT, 1); TEST_UTIL.getConfiguration().setInt(HConstants.REGION_SERVER_HANDLER_COUNT, 1);
// Used in testServerBusyException
TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_PERSERVER_REQUESTS_THRESHOLD, 3);
TEST_UTIL.startMiniCluster(2); TEST_UTIL.startMiniCluster(2);
} }
@ -1500,4 +1512,97 @@ public class TestHCM {
table.close(); table.close();
connection.close(); connection.close();
} }
private class TestGetThread extends Thread {
Table table;
int getServerBusyException = 0;
TestGetThread(Table table){
this.table = table;
}
@Override
public void run() {
try {
table.get(new Get(ROW));
} catch (ServerTooBusyException e) {
getServerBusyException = 1;
} catch (IOException ignore) {
}
}
}
private class TestPutThread extends Thread {
Table table;
int getServerBusyException = 0;
TestPutThread(Table table){
this.table = table;
}
@Override
public void run() {
try {
Put p = new Put(ROW);
p.addColumn(FAM_NAM,new byte[]{0}, new byte[]{0});
table.put(p);
} catch (RetriesExhaustedWithDetailsException e) {
// For put we use AsyncProcess and it will wrap all exceptions to this.
if (e.exceptions.get(0) instanceof ServerTooBusyException) {
getServerBusyException = 1;
}
} catch (IOException ignore) {
}
}
}
@Test()
public void testServerBusyException() throws Exception {
HTableDescriptor hdt = TEST_UTIL.createTableDescriptor("HCM-testServerBusy");
hdt.addCoprocessor(SleepCoprocessor.class.getName());
Configuration c = new Configuration(TEST_UTIL.getConfiguration());
TEST_UTIL.createTable(hdt, new byte[][] { FAM_NAM }, c);
TestGetThread tg1 = new TestGetThread(TEST_UTIL.getConnection().getTable(hdt.getTableName()));
TestGetThread tg2 = new TestGetThread(TEST_UTIL.getConnection().getTable(hdt.getTableName()));
TestGetThread tg3 = new TestGetThread(TEST_UTIL.getConnection().getTable(hdt.getTableName()));
TestGetThread tg4 = new TestGetThread(TEST_UTIL.getConnection().getTable(hdt.getTableName()));
TestGetThread tg5 = new TestGetThread(TEST_UTIL.getConnection().getTable(hdt.getTableName()));
tg1.start();
tg2.start();
tg3.start();
tg4.start();
tg5.start();
tg1.join();
tg2.join();
tg3.join();
tg4.join();
tg5.join();
assertEquals(2,
tg1.getServerBusyException + tg2.getServerBusyException + tg3.getServerBusyException
+ tg4.getServerBusyException + tg5.getServerBusyException);
// Put has its own logic in HTable, test Put alone. We use AsyncProcess for Put (use multi at
// RPC level) and it wrap exceptions to RetriesExhaustedWithDetailsException.
TestPutThread tp1 = new TestPutThread(TEST_UTIL.getConnection().getTable(hdt.getTableName()));
TestPutThread tp2 = new TestPutThread(TEST_UTIL.getConnection().getTable(hdt.getTableName()));
TestPutThread tp3 = new TestPutThread(TEST_UTIL.getConnection().getTable(hdt.getTableName()));
TestPutThread tp4 = new TestPutThread(TEST_UTIL.getConnection().getTable(hdt.getTableName()));
TestPutThread tp5 = new TestPutThread(TEST_UTIL.getConnection().getTable(hdt.getTableName()));
tp1.start();
tp2.start();
tp3.start();
tp4.start();
tp5.start();
tp1.join();
tp2.join();
tp3.join();
tp4.join();
tp5.join();
assertEquals(2,
tp1.getServerBusyException + tp2.getServerBusyException + tp3.getServerBusyException
+ tp4.getServerBusyException + tp5.getServerBusyException);
}
} }