HBASE-27487: Slow meta can create pathological feedback loop with multigets (#4900)
Signed-off-by: Bryan Beaudreault <bbeaudreault@apache.org> Signed-off-by: Duo Zhang <zhangduo@apache.org>
This commit is contained in:
parent
75e49befb6
commit
1bfc58a369
|
@ -207,6 +207,14 @@ class AsyncRequestFutureImpl<CResult> implements AsyncRequestFuture {
|
||||||
// Cancelled
|
// Cancelled
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
} catch (OperationTimeoutExceededException e) {
|
||||||
|
// The operation has timed out before executing the actual callable. This may be due to
|
||||||
|
// slow/hotspotted meta or the operation timeout set too low for the number of requests.
|
||||||
|
// Circumventing the usual failure flow ensure the meta cache is not cleared and will not
|
||||||
|
// result in a doomed feedback loop in which the meta continues to be hotspotted.
|
||||||
|
// See HBASE-27487
|
||||||
|
failAll(multiAction, server, numAttempt, e);
|
||||||
|
return;
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
// The service itself failed . It may be an error coming from the communication
|
// The service itself failed . It may be an error coming from the communication
|
||||||
// layer, but, as well, a functional error raised by the server.
|
// layer, but, as well, a functional error raised by the server.
|
||||||
|
@ -676,6 +684,25 @@ class AsyncRequestFutureImpl<CResult> implements AsyncRequestFuture {
|
||||||
return canRetry;
|
return canRetry;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Fail all the actions from this multiaction after an OperationTimeoutExceededException
|
||||||
|
* @param actions the actions still to do from the initial list
|
||||||
|
* @param server the destination
|
||||||
|
* @param numAttempt the number of attempts so far
|
||||||
|
* @param throwable the throwable that caused the failure
|
||||||
|
*/
|
||||||
|
private void failAll(MultiAction actions, ServerName server, int numAttempt,
|
||||||
|
Throwable throwable) {
|
||||||
|
int failed = 0;
|
||||||
|
for (Map.Entry<byte[], List<Action>> e : actions.actions.entrySet()) {
|
||||||
|
for (Action action : e.getValue()) {
|
||||||
|
setError(action.getOriginalIndex(), action.getAction(), throwable, server);
|
||||||
|
++failed;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
logNoResubmit(server, numAttempt, actions.size(), throwable, failed, 0);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Resubmit all the actions from this multiaction after a failure.
|
* Resubmit all the actions from this multiaction after a failure.
|
||||||
* @param rsActions the actions still to do from the initial list
|
* @param rsActions the actions still to do from the initial list
|
||||||
|
|
|
@ -19,7 +19,6 @@ package org.apache.hadoop.hbase.client;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.InterruptedIOException;
|
import java.io.InterruptedIOException;
|
||||||
import org.apache.hadoop.hbase.DoNotRetryIOException;
|
|
||||||
import org.apache.hadoop.hbase.ServerName;
|
import org.apache.hadoop.hbase.ServerName;
|
||||||
import org.apache.hadoop.hbase.TableName;
|
import org.apache.hadoop.hbase.TableName;
|
||||||
import org.apache.yetus.audience.InterfaceAudience;
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
|
@ -64,7 +63,10 @@ abstract class CancellableRegionServerCallable<T> extends ClientServiceCallable<
|
||||||
int remainingTime = tracker.getRemainingTime(operationTimeout);
|
int remainingTime = tracker.getRemainingTime(operationTimeout);
|
||||||
if (remainingTime <= 1) {
|
if (remainingTime <= 1) {
|
||||||
// "1" is a special return value in RetryingTimeTracker, see its implementation.
|
// "1" is a special return value in RetryingTimeTracker, see its implementation.
|
||||||
throw new DoNotRetryIOException("Operation rpcTimeout");
|
throw new OperationTimeoutExceededException(
|
||||||
|
"Timeout exceeded before call began. Meta requests may be slow, the operation "
|
||||||
|
+ "timeout is too short for the number of requests, or the configured retries "
|
||||||
|
+ "can't complete in the operation timeout.");
|
||||||
}
|
}
|
||||||
return super.call(Math.min(rpcTimeout, remainingTime));
|
return super.call(Math.min(rpcTimeout, remainingTime));
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,41 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.client;
|
||||||
|
|
||||||
|
import org.apache.hadoop.hbase.DoNotRetryIOException;
|
||||||
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Thrown when a batch operation exceeds the operation timeout
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Public
|
||||||
|
public class OperationTimeoutExceededException extends DoNotRetryIOException {
|
||||||
|
|
||||||
|
public OperationTimeoutExceededException() {
|
||||||
|
super();
|
||||||
|
}
|
||||||
|
|
||||||
|
public OperationTimeoutExceededException(String msg) {
|
||||||
|
super(msg);
|
||||||
|
}
|
||||||
|
|
||||||
|
public OperationTimeoutExceededException(String msg, Throwable t) {
|
||||||
|
super(msg, t);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -15,23 +15,21 @@
|
||||||
* See the License for the specific language governing permissions and
|
* See the License for the specific language governing permissions and
|
||||||
* limitations under the License.
|
* limitations under the License.
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hbase;
|
package org.apache.hadoop.hbase.client;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.hbase.client.MetricsConnection.CLIENT_SIDE_METRICS_ENABLED_KEY;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.net.SocketTimeoutException;
|
import java.net.SocketTimeoutException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
|
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||||
import org.apache.hadoop.hbase.client.Connection;
|
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||||
import org.apache.hadoop.hbase.client.ConnectionFactory;
|
import org.apache.hadoop.hbase.HConstants;
|
||||||
import org.apache.hadoop.hbase.client.Get;
|
import org.apache.hadoop.hbase.MiniHBaseCluster;
|
||||||
import org.apache.hadoop.hbase.client.Put;
|
import org.apache.hadoop.hbase.StartMiniClusterOption;
|
||||||
import org.apache.hadoop.hbase.client.ResultScanner;
|
import org.apache.hadoop.hbase.TableName;
|
||||||
import org.apache.hadoop.hbase.client.RetriesExhaustedException;
|
|
||||||
import org.apache.hadoop.hbase.client.Scan;
|
|
||||||
import org.apache.hadoop.hbase.client.Table;
|
|
||||||
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
|
|
||||||
import org.apache.hadoop.hbase.ipc.CallTimeoutException;
|
import org.apache.hadoop.hbase.ipc.CallTimeoutException;
|
||||||
import org.apache.hadoop.hbase.regionserver.HRegionServer;
|
import org.apache.hadoop.hbase.regionserver.HRegionServer;
|
||||||
import org.apache.hadoop.hbase.regionserver.RSRpcServices;
|
import org.apache.hadoop.hbase.regionserver.RSRpcServices;
|
||||||
|
@ -78,7 +76,8 @@ public class TestClientOperationTimeout {
|
||||||
private static int DELAY_GET;
|
private static int DELAY_GET;
|
||||||
private static int DELAY_SCAN;
|
private static int DELAY_SCAN;
|
||||||
private static int DELAY_MUTATE;
|
private static int DELAY_MUTATE;
|
||||||
private static int DELAY_BATCH_MUTATE;
|
private static int DELAY_BATCH;
|
||||||
|
private static int DELAY_META_SCAN;
|
||||||
|
|
||||||
private static final TableName TABLE_NAME = TableName.valueOf("Timeout");
|
private static final TableName TABLE_NAME = TableName.valueOf("Timeout");
|
||||||
private static final byte[] FAMILY = Bytes.toBytes("family");
|
private static final byte[] FAMILY = Bytes.toBytes("family");
|
||||||
|
@ -112,7 +111,8 @@ public class TestClientOperationTimeout {
|
||||||
DELAY_GET = 0;
|
DELAY_GET = 0;
|
||||||
DELAY_SCAN = 0;
|
DELAY_SCAN = 0;
|
||||||
DELAY_MUTATE = 0;
|
DELAY_MUTATE = 0;
|
||||||
DELAY_BATCH_MUTATE = 0;
|
DELAY_BATCH = 0;
|
||||||
|
DELAY_META_SCAN = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
@AfterClass
|
@AfterClass
|
||||||
|
@ -157,12 +157,12 @@ public class TestClientOperationTimeout {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Tests that a batch mutate on a table throws {@link SocketTimeoutException} when the operation
|
* Tests that a batch mutate and batch get on a table throws {@link SocketTimeoutException} when
|
||||||
* takes longer than 'hbase.client.operation.timeout'.
|
* the operation takes longer than 'hbase.client.operation.timeout'.
|
||||||
*/
|
*/
|
||||||
@Test
|
@Test
|
||||||
public void testMultiPutsTimeout() {
|
public void testMultiTimeout() {
|
||||||
DELAY_BATCH_MUTATE = 600;
|
DELAY_BATCH = 600;
|
||||||
Put put1 = new Put(ROW);
|
Put put1 = new Put(ROW);
|
||||||
put1.addColumn(FAMILY, QUALIFIER, VALUE);
|
put1.addColumn(FAMILY, QUALIFIER, VALUE);
|
||||||
Put put2 = new Put(ROW);
|
Put put2 = new Put(ROW);
|
||||||
|
@ -176,6 +176,72 @@ public class TestClientOperationTimeout {
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
Assert.assertTrue(e instanceof SocketTimeoutException);
|
Assert.assertTrue(e instanceof SocketTimeoutException);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Get get1 = new Get(ROW);
|
||||||
|
get1.addColumn(FAMILY, QUALIFIER);
|
||||||
|
Get get2 = new Get(ROW);
|
||||||
|
get2.addColumn(FAMILY, QUALIFIER);
|
||||||
|
|
||||||
|
List<Get> gets = new ArrayList<>();
|
||||||
|
gets.add(get1);
|
||||||
|
gets.add(get2);
|
||||||
|
try {
|
||||||
|
TABLE.batch(gets, new Object[2]);
|
||||||
|
Assert.fail("should not reach here");
|
||||||
|
} catch (Exception e) {
|
||||||
|
Assert.assertTrue(e instanceof SocketTimeoutException);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Tests that a batch get on a table throws
|
||||||
|
* {@link org.apache.hadoop.hbase.client.OperationTimeoutExceededException} when the region lookup
|
||||||
|
* takes longer than the 'hbase.client.operation.timeout'
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testMultiGetMetaTimeout() throws IOException {
|
||||||
|
|
||||||
|
Configuration conf = new Configuration(UTIL.getConfiguration());
|
||||||
|
|
||||||
|
// the operation timeout must be lower than the delay from a meta scan to etch region locations
|
||||||
|
// of the get requests. Simply increasing the meta scan timeout to greater than the
|
||||||
|
// HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD will result in SocketTimeoutException on the scans thus
|
||||||
|
// avoiding the simulation of load on meta. See: HBASE-27487
|
||||||
|
conf.setLong(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 400);
|
||||||
|
conf.setBoolean(CLIENT_SIDE_METRICS_ENABLED_KEY, true);
|
||||||
|
try (Connection specialConnection = ConnectionFactory.createConnection(conf);
|
||||||
|
Table specialTable = specialConnection.getTable(TABLE_NAME)) {
|
||||||
|
|
||||||
|
MetricsConnection metrics =
|
||||||
|
((ConnectionImplementation) specialConnection).getConnectionMetrics();
|
||||||
|
long metaCacheNumClearServerPreFailure = metrics.metaCacheNumClearServer.getCount();
|
||||||
|
|
||||||
|
DELAY_META_SCAN = 400;
|
||||||
|
List<Get> gets = new ArrayList<>();
|
||||||
|
// we need to ensure the region look-ups eat up more time than the operation timeout without
|
||||||
|
// exceeding the scan timeout.
|
||||||
|
for (int i = 0; i < 100; i++) {
|
||||||
|
gets.add(new Get(Bytes.toBytes(i)).addColumn(FAMILY, QUALIFIER));
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
specialTable.get(gets);
|
||||||
|
Assert.fail("should not reach here");
|
||||||
|
} catch (Exception e) {
|
||||||
|
RetriesExhaustedWithDetailsException expected = (RetriesExhaustedWithDetailsException) e;
|
||||||
|
Assert.assertEquals(100, expected.getNumExceptions());
|
||||||
|
|
||||||
|
// verify we do not clear the cache in this situation otherwise we will create pathological
|
||||||
|
// feedback loop with multigets See: HBASE-27487
|
||||||
|
long metaCacheNumClearServerPostFailure = metrics.metaCacheNumClearServer.getCount();
|
||||||
|
Assert.assertEquals(metaCacheNumClearServerPreFailure, metaCacheNumClearServerPostFailure);
|
||||||
|
|
||||||
|
for (Throwable cause : expected.getCauses()) {
|
||||||
|
Assert.assertTrue(cause instanceof OperationTimeoutExceededException);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -240,7 +306,12 @@ public class TestClientOperationTimeout {
|
||||||
public ClientProtos.ScanResponse scan(RpcController controller,
|
public ClientProtos.ScanResponse scan(RpcController controller,
|
||||||
ClientProtos.ScanRequest request) throws ServiceException {
|
ClientProtos.ScanRequest request) throws ServiceException {
|
||||||
try {
|
try {
|
||||||
Thread.sleep(DELAY_SCAN);
|
String regionName = Bytes.toString(request.getRegion().getValue().toByteArray());
|
||||||
|
if (regionName.contains(TableName.META_TABLE_NAME.getNameAsString())) {
|
||||||
|
Thread.sleep(DELAY_META_SCAN);
|
||||||
|
} else {
|
||||||
|
Thread.sleep(DELAY_SCAN);
|
||||||
|
}
|
||||||
} catch (InterruptedException e) {
|
} catch (InterruptedException e) {
|
||||||
LOG.error("Sleep interrupted during scan operation", e);
|
LOG.error("Sleep interrupted during scan operation", e);
|
||||||
}
|
}
|
||||||
|
@ -251,7 +322,7 @@ public class TestClientOperationTimeout {
|
||||||
public ClientProtos.MultiResponse multi(RpcController rpcc, ClientProtos.MultiRequest request)
|
public ClientProtos.MultiResponse multi(RpcController rpcc, ClientProtos.MultiRequest request)
|
||||||
throws ServiceException {
|
throws ServiceException {
|
||||||
try {
|
try {
|
||||||
Thread.sleep(DELAY_BATCH_MUTATE);
|
Thread.sleep(DELAY_BATCH);
|
||||||
} catch (InterruptedException e) {
|
} catch (InterruptedException e) {
|
||||||
LOG.error("Sleep interrupted during multi operation", e);
|
LOG.error("Sleep interrupted during multi operation", e);
|
||||||
}
|
}
|
Loading…
Reference in New Issue