HBASE-8581 rpc refactor dropped passing the operation timeout through to the rpcclient
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1484942 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
4f45d96ab1
commit
f0ad092dd6
|
@ -243,9 +243,11 @@ public class HTable implements HTableInterface {
|
|||
*/
|
||||
private void finishSetup() throws IOException {
|
||||
this.connection.locateRegion(tableName, HConstants.EMPTY_START_ROW);
|
||||
this.operationTimeout = HTableDescriptor.isMetaTable(tableName) ? HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT
|
||||
: this.configuration.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
|
||||
HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT);
|
||||
this.operationTimeout = HTableDescriptor.isMetaTable(tableName) ?
|
||||
this.configuration.getInt(HConstants.HBASE_CLIENT_META_OPERATION_TIMEOUT,
|
||||
HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT):
|
||||
this.configuration.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
|
||||
HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT);
|
||||
this.writeBufferSize = this.configuration.getLong(
|
||||
"hbase.client.write.buffer", 2097152);
|
||||
this.clearBufferOnFail = true;
|
||||
|
@ -547,8 +549,7 @@ public class HTable implements HTableInterface {
|
|||
if (scan.getCaching() <= 0) {
|
||||
scan.setCaching(getScannerCaching());
|
||||
}
|
||||
return new ClientScanner(getConfiguration(), scan, getTableName(),
|
||||
this.connection);
|
||||
return new ClientScanner(getConfiguration(), scan, getTableName(), this.connection);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -888,6 +889,7 @@ public class HTable implements HTableInterface {
|
|||
try {
|
||||
GetRequest request = RequestConverter.buildGetRequest(
|
||||
location.getRegionInfo().getRegionName(), get, true);
|
||||
|
||||
GetResponse response = stub.get(null, request);
|
||||
return response.getExists();
|
||||
} catch (ServiceException se) {
|
||||
|
|
|
@ -206,13 +206,13 @@ public abstract class ServerCallable<T> implements Callable<T> {
|
|||
}
|
||||
|
||||
// If, after the planned sleep, there won't be enough time left, we stop now.
|
||||
if (((this.endTime - this.globalStartTime) + MIN_RPC_TIMEOUT + expectedSleep) >
|
||||
this.callTimeout) {
|
||||
long duration = singleCallDuration(expectedSleep);
|
||||
if (duration > this.callTimeout) {
|
||||
throw (SocketTimeoutException) new SocketTimeoutException(
|
||||
"Call to access row '" + Bytes.toString(row) + "' on table '"
|
||||
+ Bytes.toString(tableName)
|
||||
+ "' failed on timeout. " + " callTimeout=" + this.callTimeout +
|
||||
", time=" + (this.endTime - this.startTime)).initCause(t);
|
||||
", callDuration=" + duration).initCause(t);
|
||||
}
|
||||
} finally {
|
||||
afterCall();
|
||||
|
@ -226,6 +226,14 @@ public abstract class ServerCallable<T> implements Callable<T> {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @param expectedSleep
|
||||
* @return Calculate how long a single call took
|
||||
*/
|
||||
private long singleCallDuration(final long expectedSleep) {
|
||||
return (this.endTime - this.globalStartTime) + MIN_RPC_TIMEOUT + expectedSleep;
|
||||
}
|
||||
|
||||
/**
|
||||
* Run this instance against the server once.
|
||||
* @return an object of type T
|
||||
|
|
|
@ -1491,6 +1491,14 @@ public class RpcClient {
|
|||
return rpcTimeout.get();
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the lower of the thread-local RPC time from {@link #setRpcTimeout(int)} and the given
|
||||
* default timeout.
|
||||
*/
|
||||
public static int getRpcTimeout(int defaultTimeout) {
|
||||
return Math.min(defaultTimeout, rpcTimeout.get());
|
||||
}
|
||||
|
||||
public static void resetRpcTimeout() {
|
||||
rpcTimeout.remove();
|
||||
}
|
||||
|
@ -1575,7 +1583,9 @@ public class RpcClient {
|
|||
final User ticket, final int rpcTimeout) {
|
||||
this.isa = new InetSocketAddress(sn.getHostname(), sn.getPort());
|
||||
this.rpcClient = rpcClient;
|
||||
this.rpcTimeout = rpcTimeout;
|
||||
// Set the rpc timeout to be the minimum of configured timeout and whatever the current
|
||||
// thread local setting is.
|
||||
this.rpcTimeout = getRpcTimeout(rpcTimeout);
|
||||
this.ticket = ticket;
|
||||
}
|
||||
|
||||
|
|
|
@ -17,7 +17,10 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.client;
|
||||
|
||||
import static org.junit.Assert.*;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.SocketTimeoutException;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
@ -31,6 +34,7 @@ import org.apache.hadoop.hbase.exceptions.RegionServerStoppedException;
|
|||
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService.BlockingInterface;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.mockito.Mockito;
|
||||
|
@ -87,6 +91,41 @@ public class TestClientNoCluster {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Test that operation timeout prevails over rpc default timeout and retries, etc.
|
||||
* @throws IOException
|
||||
*/
|
||||
@Test
|
||||
public void testRocTimeout() throws IOException {
|
||||
Configuration localConfig = HBaseConfiguration.create(this.conf);
|
||||
// This override mocks up our exists/get call to throw a RegionServerStoppedException.
|
||||
localConfig.set("hbase.client.connection.impl", RpcTimeoutConnection.class.getName());
|
||||
int pause = 10;
|
||||
localConfig.setInt("hbase.client.pause", pause);
|
||||
localConfig.setInt("hbase.client.retries.number", 10);
|
||||
// Set the operation timeout to be < the pause. Expectation is that after first pause, we will
|
||||
// fail out of the rpc because the rpc timeout will have been set to the operation tiemout
|
||||
// and it has expired. Otherwise, if this functionality is broke, all retries will be run --
|
||||
// all ten of them -- and we'll get the RetriesExhaustedException exception.
|
||||
localConfig.setInt(HConstants.HBASE_CLIENT_META_OPERATION_TIMEOUT, pause - 1);
|
||||
HTable table = new HTable(localConfig, HConstants.META_TABLE_NAME);
|
||||
Throwable t = null;
|
||||
try {
|
||||
// An exists call turns into a get w/ a flag.
|
||||
table.exists(new Get(Bytes.toBytes("abc")));
|
||||
} catch (SocketTimeoutException e) {
|
||||
// I expect this exception.
|
||||
LOG.info("Got expected exception", e);
|
||||
t = e;
|
||||
} catch (RetriesExhaustedException e) {
|
||||
// This is the old, unwanted behavior. If we get here FAIL!!!
|
||||
fail();
|
||||
} finally {
|
||||
table.close();
|
||||
}
|
||||
assertTrue(t != null);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDoNotRetryMetaScanner() throws IOException {
|
||||
this.conf.set("hbase.client.connection.impl",
|
||||
|
@ -197,4 +236,31 @@ public class TestClientNoCluster {
|
|||
return this.stub;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Override to check we are setting rpc timeout right.
|
||||
*/
|
||||
static class RpcTimeoutConnection
|
||||
extends HConnectionManager.HConnectionImplementation {
|
||||
final ClientService.BlockingInterface stub;
|
||||
|
||||
RpcTimeoutConnection(Configuration conf, boolean managed)
|
||||
throws IOException {
|
||||
super(conf, managed);
|
||||
// Mock up my stub so an exists call -- which turns into a get -- throws an exception
|
||||
this.stub = Mockito.mock(ClientService.BlockingInterface.class);
|
||||
try {
|
||||
Mockito.when(stub.get((RpcController)Mockito.any(),
|
||||
(ClientProtos.GetRequest)Mockito.any())).
|
||||
thenThrow(new ServiceException(new RegionServerStoppedException("From Mockito")));
|
||||
} catch (ServiceException e) {
|
||||
throw new IOException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public BlockingInterface getClient(ServerName sn) throws IOException {
|
||||
return this.stub;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,66 @@
|
|||
# Licensed to the Apache Software Foundation (ASF) under one
|
||||
# or more contributor license agreements. See the NOTICE file
|
||||
# distributed with this work for additional information
|
||||
# regarding copyright ownership. The ASF licenses this file
|
||||
# to you under the Apache License, Version 2.0 (the
|
||||
# "License"); you may not use this file except in compliance
|
||||
# with the License. You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
# Define some default values that can be overridden by system properties
|
||||
hbase.root.logger=INFO,console
|
||||
hbase.log.dir=.
|
||||
hbase.log.file=hbase.log
|
||||
|
||||
# Define the root logger to the system property "hbase.root.logger".
|
||||
log4j.rootLogger=${hbase.root.logger}
|
||||
|
||||
# Logging Threshold
|
||||
log4j.threshhold=ALL
|
||||
|
||||
#
|
||||
# Daily Rolling File Appender
|
||||
#
|
||||
log4j.appender.DRFA=org.apache.log4j.DailyRollingFileAppender
|
||||
log4j.appender.DRFA.File=${hbase.log.dir}/${hbase.log.file}
|
||||
|
||||
# Rollver at midnight
|
||||
log4j.appender.DRFA.DatePattern=.yyyy-MM-dd
|
||||
|
||||
# 30-day backup
|
||||
#log4j.appender.DRFA.MaxBackupIndex=30
|
||||
log4j.appender.DRFA.layout=org.apache.log4j.PatternLayout
|
||||
# Debugging Pattern format
|
||||
log4j.appender.DRFA.layout.ConversionPattern=%d{ISO8601} %-5p [%t] %C{2}(%L): %m%n
|
||||
|
||||
|
||||
#
|
||||
# console
|
||||
# Add "console" to rootlogger above if you want to use this
|
||||
#
|
||||
log4j.appender.console=org.apache.log4j.ConsoleAppender
|
||||
log4j.appender.console.target=System.err
|
||||
log4j.appender.console.layout=org.apache.log4j.PatternLayout
|
||||
log4j.appender.console.layout.ConversionPattern=%d{ISO8601} %-5p [%t] %C{2}(%L): %m%n
|
||||
|
||||
# Custom Logging levels
|
||||
|
||||
#log4j.logger.org.apache.hadoop.fs.FSNamesystem=DEBUG
|
||||
|
||||
log4j.logger.org.apache.hadoop=WARN
|
||||
log4j.logger.org.apache.zookeeper=ERROR
|
||||
log4j.logger.org.apache.hadoop.hbase=DEBUG
|
||||
|
||||
#These two settings are workarounds against spurious logs from the minicluster.
|
||||
#See HBASE-4709
|
||||
log4j.org.apache.hadoop.metrics2.impl.MetricsSystemImpl=ERROR
|
||||
log4j.org.apache.hadoop.metrics2.util.MBeans=ERROR
|
||||
# Enable this to get detailed connection error/retry logging.
|
||||
# log4j.logger.org.apache.hadoop.hbase.client.HConnectionManager$HConnectionImplementation=TRACE
|
|
@ -264,6 +264,10 @@ public final class HConstants {
|
|||
/** Parameter name for HBase client operation timeout, which overrides RPC timeout */
|
||||
public static final String HBASE_CLIENT_OPERATION_TIMEOUT = "hbase.client.operation.timeout";
|
||||
|
||||
/** Parameter name for HBase client operation timeout, which overrides RPC timeout */
|
||||
public static final String HBASE_CLIENT_META_OPERATION_TIMEOUT =
|
||||
"hbase.client.meta.operation.timeout";
|
||||
|
||||
/** Default HBase client operation timeout, which is tantamount to a blocking call */
|
||||
public static final int DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT = Integer.MAX_VALUE;
|
||||
|
||||
|
|
Loading…
Reference in New Issue