HBASE-27490 Locating regions for all actions of batch requests can exceed operation timeout (#4908)
Signed-off-by: Xiaolin Ha <haxiaolin@apache.org>
This commit is contained in:
parent
3e3c8fa700
commit
ed8549b3f6
|
@ -409,6 +409,28 @@ class AsyncRequestFutureImpl<CResult> implements AsyncRequestFuture {
|
||||||
return new SingleServerRequestRunnable(multiAction, numAttempt, server, callsInProgress);
|
return new SingleServerRequestRunnable(multiAction, numAttempt, server, callsInProgress);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Some checked calls send a callable with their own tracker. This method checks the operation
|
||||||
|
* timeout against the appropriate tracker, or returns false if no tracker.
|
||||||
|
*/
|
||||||
|
private boolean isOperationTimeoutExceeded() {
|
||||||
|
RetryingTimeTracker currentTracker;
|
||||||
|
if (tracker != null) {
|
||||||
|
currentTracker = tracker;
|
||||||
|
} else if (currentCallable != null && currentCallable.getTracker() != null) {
|
||||||
|
currentTracker = currentCallable.getTracker();
|
||||||
|
} else {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
// no-op if already started, this is just to ensure it was initialized (usually true)
|
||||||
|
currentTracker.start();
|
||||||
|
|
||||||
|
// return value of 1 is special to mean exceeded, to differentiate from 0
|
||||||
|
// which is no timeout. see implementation of getRemainingTime
|
||||||
|
return currentTracker.getRemainingTime(operationTimeout) == 1;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Group a list of actions per region servers, and send them.
|
* Group a list of actions per region servers, and send them.
|
||||||
* @param currentActions - the list of row to submit
|
* @param currentActions - the list of row to submit
|
||||||
|
@ -420,6 +442,27 @@ class AsyncRequestFutureImpl<CResult> implements AsyncRequestFuture {
|
||||||
boolean isReplica = false;
|
boolean isReplica = false;
|
||||||
List<Action> unknownReplicaActions = null;
|
List<Action> unknownReplicaActions = null;
|
||||||
for (Action action : currentActions) {
|
for (Action action : currentActions) {
|
||||||
|
if (isOperationTimeoutExceeded()) {
|
||||||
|
String message = numAttempt == 1
|
||||||
|
? "Operation timeout exceeded during resolution of region locations, "
|
||||||
|
+ "prior to executing any actions."
|
||||||
|
: "Operation timeout exceeded during re-resolution of region locations on retry "
|
||||||
|
+ (numAttempt - 1) + ".";
|
||||||
|
|
||||||
|
message += " Meta may be slow or operation timeout too short for batch size or retries.";
|
||||||
|
OperationTimeoutExceededException exception =
|
||||||
|
new OperationTimeoutExceededException(message);
|
||||||
|
|
||||||
|
// Clear any actions we already resolved, because none will have been executed yet
|
||||||
|
// We are going to fail all passed actions because there's no way we can execute any
|
||||||
|
// if operation timeout is exceeded.
|
||||||
|
actionsByServer.clear();
|
||||||
|
for (Action actionToFail : currentActions) {
|
||||||
|
manageLocationError(actionToFail, exception);
|
||||||
|
}
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
RegionLocations locs = findAllLocationsOrFail(action, true);
|
RegionLocations locs = findAllLocationsOrFail(action, true);
|
||||||
if (locs == null) continue;
|
if (locs == null) continue;
|
||||||
boolean isReplicaAction = !RegionReplicaUtil.isDefaultReplica(action.getReplicaId());
|
boolean isReplicaAction = !RegionReplicaUtil.isDefaultReplica(action.getReplicaId());
|
||||||
|
|
|
@ -122,4 +122,8 @@ abstract class CancellableRegionServerCallable<T> extends ClientServiceCallable<
|
||||||
throws org.apache.hbase.thirdparty.com.google.protobuf.ServiceException {
|
throws org.apache.hbase.thirdparty.com.google.protobuf.ServiceException {
|
||||||
return getStub().cleanupBulkLoad(getRpcController(), request);
|
return getStub().cleanupBulkLoad(getRpcController(), request);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
RetryingTimeTracker getTracker() {
|
||||||
|
return tracker;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -28,6 +28,7 @@ import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||||
import org.apache.hadoop.hbase.HConstants;
|
import org.apache.hadoop.hbase.HConstants;
|
||||||
import org.apache.hadoop.hbase.MiniHBaseCluster;
|
import org.apache.hadoop.hbase.MiniHBaseCluster;
|
||||||
|
import org.apache.hadoop.hbase.NotServingRegionException;
|
||||||
import org.apache.hadoop.hbase.StartMiniClusterOption;
|
import org.apache.hadoop.hbase.StartMiniClusterOption;
|
||||||
import org.apache.hadoop.hbase.TableName;
|
import org.apache.hadoop.hbase.TableName;
|
||||||
import org.apache.hadoop.hbase.ipc.CallTimeoutException;
|
import org.apache.hadoop.hbase.ipc.CallTimeoutException;
|
||||||
|
@ -79,6 +80,8 @@ public class TestClientOperationTimeout {
|
||||||
private static int DELAY_BATCH;
|
private static int DELAY_BATCH;
|
||||||
private static int DELAY_META_SCAN;
|
private static int DELAY_META_SCAN;
|
||||||
|
|
||||||
|
private static boolean FAIL_BATCH = false;
|
||||||
|
|
||||||
private static final TableName TABLE_NAME = TableName.valueOf("Timeout");
|
private static final TableName TABLE_NAME = TableName.valueOf("Timeout");
|
||||||
private static final byte[] FAMILY = Bytes.toBytes("family");
|
private static final byte[] FAMILY = Bytes.toBytes("family");
|
||||||
private static final byte[] ROW = Bytes.toBytes("row");
|
private static final byte[] ROW = Bytes.toBytes("row");
|
||||||
|
@ -113,6 +116,7 @@ public class TestClientOperationTimeout {
|
||||||
DELAY_MUTATE = 0;
|
DELAY_MUTATE = 0;
|
||||||
DELAY_BATCH = 0;
|
DELAY_BATCH = 0;
|
||||||
DELAY_META_SCAN = 0;
|
DELAY_META_SCAN = 0;
|
||||||
|
FAIL_BATCH = false;
|
||||||
}
|
}
|
||||||
|
|
||||||
@AfterClass
|
@AfterClass
|
||||||
|
@ -196,17 +200,16 @@ public class TestClientOperationTimeout {
|
||||||
/**
|
/**
|
||||||
* Tests that a batch get on a table throws
|
* Tests that a batch get on a table throws
|
||||||
* {@link org.apache.hadoop.hbase.client.OperationTimeoutExceededException} when the region lookup
|
* {@link org.apache.hadoop.hbase.client.OperationTimeoutExceededException} when the region lookup
|
||||||
* takes longer than the 'hbase.client.operation.timeout'
|
* takes longer than the 'hbase.client.operation.timeout'. This specifically tests that when meta
|
||||||
|
* is slow, the fetching of region locations for a batch is not allowed to itself exceed the
|
||||||
|
* operation timeout. In a batch size of 100, it's possible to need to make 100 meta calls in
|
||||||
|
* sequence. If meta is slow, we should abort the request once the operation timeout is exceeded,
|
||||||
|
* even if we haven't finished locating all regions. See HBASE-27490
|
||||||
*/
|
*/
|
||||||
@Test
|
@Test
|
||||||
public void testMultiGetMetaTimeout() throws IOException {
|
public void testMultiGetMetaTimeout() throws IOException {
|
||||||
|
|
||||||
Configuration conf = new Configuration(UTIL.getConfiguration());
|
Configuration conf = new Configuration(UTIL.getConfiguration());
|
||||||
|
|
||||||
// the operation timeout must be lower than the delay from a meta scan to etch region locations
|
|
||||||
// of the get requests. Simply increasing the meta scan timeout to greater than the
|
|
||||||
// HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD will result in SocketTimeoutException on the scans thus
|
|
||||||
// avoiding the simulation of load on meta. See: HBASE-27487
|
|
||||||
conf.setLong(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 400);
|
conf.setLong(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 400);
|
||||||
conf.setBoolean(CLIENT_SIDE_METRICS_ENABLED_KEY, true);
|
conf.setBoolean(CLIENT_SIDE_METRICS_ENABLED_KEY, true);
|
||||||
try (Connection specialConnection = ConnectionFactory.createConnection(conf);
|
try (Connection specialConnection = ConnectionFactory.createConnection(conf);
|
||||||
|
@ -216,7 +219,9 @@ public class TestClientOperationTimeout {
|
||||||
((ConnectionImplementation) specialConnection).getConnectionMetrics();
|
((ConnectionImplementation) specialConnection).getConnectionMetrics();
|
||||||
long metaCacheNumClearServerPreFailure = metrics.getMetaCacheNumClearServer().getCount();
|
long metaCacheNumClearServerPreFailure = metrics.getMetaCacheNumClearServer().getCount();
|
||||||
|
|
||||||
|
// delay and timeout are the same, so we should see a timeout after the first region lookup
|
||||||
DELAY_META_SCAN = 400;
|
DELAY_META_SCAN = 400;
|
||||||
|
|
||||||
List<Get> gets = new ArrayList<>();
|
List<Get> gets = new ArrayList<>();
|
||||||
// we need to ensure the region look-ups eat up more time than the operation timeout without
|
// we need to ensure the region look-ups eat up more time than the operation timeout without
|
||||||
// exceeding the scan timeout.
|
// exceeding the scan timeout.
|
||||||
|
@ -237,11 +242,70 @@ public class TestClientOperationTimeout {
|
||||||
|
|
||||||
for (Throwable cause : expected.getCauses()) {
|
for (Throwable cause : expected.getCauses()) {
|
||||||
Assert.assertTrue(cause instanceof OperationTimeoutExceededException);
|
Assert.assertTrue(cause instanceof OperationTimeoutExceededException);
|
||||||
|
// Check that this is the timeout thrown by AsyncRequestFutureImpl during region lookup
|
||||||
|
Assert.assertTrue(cause.getMessage().contains("Operation timeout exceeded during"));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Tests that a batch get on a table throws
|
||||||
|
* {@link org.apache.hadoop.hbase.client.OperationTimeoutExceededException} when retries are tuned
|
||||||
|
* too high to be able to be processed within the operation timeout. In this case, the final
|
||||||
|
* OperationTimeoutExceededException should not trigger a cache clear (but the individual failures
|
||||||
|
* may, if appropriate). This test skirts around the timeout checks during meta lookups from
|
||||||
|
* HBASE-27490, because we want to test for the case where meta lookups were able to succeed in
|
||||||
|
* time but did not leave enough time for the actual calls to occur. See HBASE-27487
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testMultiGetRetryTimeout() {
|
||||||
|
Configuration conf = new Configuration(UTIL.getConfiguration());
|
||||||
|
|
||||||
|
// allow 1 retry, and 0 backoff
|
||||||
|
conf.setLong(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 500);
|
||||||
|
conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1);
|
||||||
|
conf.setLong(HConstants.HBASE_CLIENT_PAUSE, 0);
|
||||||
|
conf.setBoolean(CLIENT_SIDE_METRICS_ENABLED_KEY, true);
|
||||||
|
|
||||||
|
try (Connection specialConnection = ConnectionFactory.createConnection(conf);
|
||||||
|
Table specialTable = specialConnection.getTable(TABLE_NAME)) {
|
||||||
|
|
||||||
|
MetricsConnection metrics =
|
||||||
|
((ConnectionImplementation) specialConnection).getConnectionMetrics();
|
||||||
|
long metaCacheNumClearServerPreFailure = metrics.getMetaCacheNumClearServer().getCount();
|
||||||
|
|
||||||
|
// meta scan should take up most of the timeout but not all
|
||||||
|
DELAY_META_SCAN = 300;
|
||||||
|
// fail the batch call, causing a retry
|
||||||
|
FAIL_BATCH = true;
|
||||||
|
|
||||||
|
// Use a batch size of 1 so that we only make 1 meta call per attempt
|
||||||
|
List<Get> gets = new ArrayList<>();
|
||||||
|
gets.add(new Get(Bytes.toBytes(0)).addColumn(FAMILY, QUALIFIER));
|
||||||
|
|
||||||
|
try {
|
||||||
|
specialTable.batch(gets, new Object[1]);
|
||||||
|
Assert.fail("should not reach here");
|
||||||
|
} catch (Exception e) {
|
||||||
|
RetriesExhaustedWithDetailsException expected = (RetriesExhaustedWithDetailsException) e;
|
||||||
|
Assert.assertEquals(1, expected.getNumExceptions());
|
||||||
|
|
||||||
|
// We expect that the error caused by FAIL_BATCH would clear the meta cache but
|
||||||
|
// the OperationTimeoutExceededException should not. So only allow new cache clear here
|
||||||
|
long metaCacheNumClearServerPostFailure = metrics.getMetaCacheNumClearServer().getCount();
|
||||||
|
Assert.assertEquals(metaCacheNumClearServerPreFailure + 1,
|
||||||
|
metaCacheNumClearServerPostFailure);
|
||||||
|
|
||||||
|
for (Throwable cause : expected.getCauses()) {
|
||||||
|
Assert.assertTrue(cause instanceof OperationTimeoutExceededException);
|
||||||
|
// Check that this is the timeout thrown by CancellableRegionServerCallable
|
||||||
|
Assert.assertTrue(cause.getMessage().contains("Timeout exceeded before call began"));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} catch (IOException e) {
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -322,6 +386,9 @@ public class TestClientOperationTimeout {
|
||||||
public ClientProtos.MultiResponse multi(RpcController rpcc, ClientProtos.MultiRequest request)
|
public ClientProtos.MultiResponse multi(RpcController rpcc, ClientProtos.MultiRequest request)
|
||||||
throws ServiceException {
|
throws ServiceException {
|
||||||
try {
|
try {
|
||||||
|
if (FAIL_BATCH) {
|
||||||
|
throw new ServiceException(new NotServingRegionException("simulated failure"));
|
||||||
|
}
|
||||||
Thread.sleep(DELAY_BATCH);
|
Thread.sleep(DELAY_BATCH);
|
||||||
} catch (InterruptedException e) {
|
} catch (InterruptedException e) {
|
||||||
LOG.error("Sleep interrupted during multi operation", e);
|
LOG.error("Sleep interrupted during multi operation", e);
|
||||||
|
|
Loading…
Reference in New Issue