HBASE-17210 Set timeout on trying rowlock according to client's RPC timeout
This commit is contained in:
parent
3c0750de54
commit
a0da66dc36
|
@ -1341,7 +1341,7 @@ public abstract class RpcServer implements RpcServerInterface,
|
|||
* call.
|
||||
* @return An RpcCallContext backed by the currently ongoing call (gotten from a thread local)
|
||||
*/
|
||||
public static RpcCallContext getCurrentCall() {
|
||||
public static RpcCall getCurrentCall() {
|
||||
return CurCall.get();
|
||||
}
|
||||
|
||||
|
|
|
@ -123,6 +123,7 @@ import org.apache.hadoop.hbase.coprocessor.RegionObserver.MutationType;
|
|||
import org.apache.hadoop.hbase.errorhandling.ForeignExceptionSnare;
|
||||
import org.apache.hadoop.hbase.exceptions.FailedSanityCheckException;
|
||||
import org.apache.hadoop.hbase.exceptions.RegionInRecoveryException;
|
||||
import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
|
||||
import org.apache.hadoop.hbase.exceptions.UnknownProtocolException;
|
||||
import org.apache.hadoop.hbase.filter.ByteArrayComparable;
|
||||
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
|
||||
|
@ -134,6 +135,7 @@ import org.apache.hadoop.hbase.io.TimeRange;
|
|||
import org.apache.hadoop.hbase.io.hfile.HFile;
|
||||
import org.apache.hadoop.hbase.ipc.CallerDisconnectedException;
|
||||
import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils;
|
||||
import org.apache.hadoop.hbase.ipc.RpcCall;
|
||||
import org.apache.hadoop.hbase.ipc.RpcCallContext;
|
||||
import org.apache.hadoop.hbase.ipc.RpcServer;
|
||||
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
|
||||
|
@ -3166,6 +3168,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
RowLock rowLock = null;
|
||||
try {
|
||||
rowLock = getRowLockInternal(mutation.getRow(), true);
|
||||
} catch (TimeoutIOException e) {
|
||||
// We will retry when other exceptions, but we should stop if we timeout .
|
||||
throw e;
|
||||
} catch (IOException ioe) {
|
||||
LOG.warn("Failed getting lock, row=" + Bytes.toStringBinary(mutation.getRow()), ioe);
|
||||
}
|
||||
|
@ -5351,15 +5356,33 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
result = rowLockContext.newWriteLock();
|
||||
}
|
||||
}
|
||||
if (!result.getLock().tryLock(this.rowLockWaitDuration, TimeUnit.MILLISECONDS)) {
|
||||
|
||||
int timeout = rowLockWaitDuration;
|
||||
boolean reachDeadlineFirst = false;
|
||||
RpcCall call = RpcServer.getCurrentCall();
|
||||
if (call != null && call.getDeadline() < Long.MAX_VALUE) {
|
||||
int timeToDeadline = (int)(call.getDeadline() - System.currentTimeMillis());
|
||||
if (timeToDeadline <= this.rowLockWaitDuration) {
|
||||
reachDeadlineFirst = true;
|
||||
timeout = timeToDeadline;
|
||||
}
|
||||
}
|
||||
|
||||
if (timeout <= 0 || !result.getLock().tryLock(timeout, TimeUnit.MILLISECONDS)) {
|
||||
if (traceScope != null) {
|
||||
traceScope.getSpan().addTimelineAnnotation("Failed to get row lock");
|
||||
}
|
||||
result = null;
|
||||
// Clean up the counts just in case this was the thing keeping the context alive.
|
||||
rowLockContext.cleanUp();
|
||||
throw new IOException("Timed out waiting for lock for row: " + rowKey + " in region "
|
||||
+ getRegionInfo().getEncodedName());
|
||||
String message = "Timed out waiting for lock for row: " + rowKey + " in region "
|
||||
+ getRegionInfo().getEncodedName();
|
||||
if (reachDeadlineFirst) {
|
||||
throw new TimeoutIOException(message);
|
||||
} else {
|
||||
// If timeToDeadline is larger than rowLockWaitDuration, we can not drop the request.
|
||||
throw new IOException(message);
|
||||
}
|
||||
}
|
||||
rowLockContext.setThreadName(Thread.currentThread().getName());
|
||||
return result;
|
||||
|
|
|
@ -0,0 +1,126 @@
|
|||
/*
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.regionserver;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.client.Delete;
|
||||
import org.apache.hadoop.hbase.client.Get;
|
||||
import org.apache.hadoop.hbase.client.Increment;
|
||||
import org.apache.hadoop.hbase.client.Result;
|
||||
import org.apache.hadoop.hbase.client.Table;
|
||||
import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
|
||||
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
|
||||
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
|
||||
import org.apache.hadoop.hbase.testclassification.LargeTests;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.Threads;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.Assert;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
import org.junit.rules.TestName;
|
||||
|
||||
@Category({LargeTests.class})
|
||||
public class TestSettingTimeoutOnBlockingPoint {
|
||||
|
||||
private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
|
||||
private static final byte[] FAM = Bytes.toBytes("f");
|
||||
private static final byte[] ROW1 = Bytes.toBytes("row1");
|
||||
private static final byte[] ROW2 = Bytes.toBytes("row2");
|
||||
|
||||
@Rule
|
||||
public TestName testName = new TestName();
|
||||
|
||||
@BeforeClass
|
||||
public static void setUpBeforeClass() throws Exception {
|
||||
TEST_UTIL.getConfiguration().setBoolean(HConstants.STATUS_PUBLISHED, true);
|
||||
TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1);
|
||||
// simulate queue blocking
|
||||
TEST_UTIL.getConfiguration().setInt(HConstants.REGION_SERVER_HANDLER_COUNT, 2);
|
||||
TEST_UTIL.startMiniCluster(2);
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void setUpAfterClass() throws Exception {
|
||||
TEST_UTIL.shutdownMiniCluster();
|
||||
}
|
||||
|
||||
public static class SleepCoprocessor extends BaseRegionObserver {
|
||||
public static final int SLEEP_TIME = 10000;
|
||||
|
||||
@Override
|
||||
public Result preIncrementAfterRowLock(final ObserverContext<RegionCoprocessorEnvironment> e,
|
||||
final Increment increment) throws IOException {
|
||||
Threads.sleep(SLEEP_TIME);
|
||||
return super.preIncrementAfterRowLock(e, increment);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRowLock() throws IOException {
|
||||
TableName tableName = TableName.valueOf(testName.getMethodName());
|
||||
HTableDescriptor hdt = TEST_UTIL.createTableDescriptor(tableName);
|
||||
hdt.addCoprocessor(SleepCoprocessor.class.getName());
|
||||
TEST_UTIL.createTable(hdt, new byte[][]{FAM}, TEST_UTIL.getConfiguration());
|
||||
|
||||
Thread incrementThread = new Thread(() -> {
|
||||
try {
|
||||
try( Table table = TEST_UTIL.getConnection().getTable(tableName)) {
|
||||
table.incrementColumnValue(ROW1, FAM, FAM, 1);
|
||||
}
|
||||
} catch (IOException e) {
|
||||
Assert.fail(e.getMessage());
|
||||
}
|
||||
});
|
||||
Thread getThread = new Thread(() -> {
|
||||
try {
|
||||
try( Table table = TEST_UTIL.getConnection().getTable(tableName)) {
|
||||
table.setRpcTimeout(1000);
|
||||
Delete delete = new Delete(ROW1);
|
||||
table.delete(delete);
|
||||
}
|
||||
} catch (IOException e) {
|
||||
Assert.fail(e.getMessage());
|
||||
}
|
||||
});
|
||||
|
||||
incrementThread.start();
|
||||
Threads.sleep(1000);
|
||||
getThread.start();
|
||||
Threads.sleep(2000);
|
||||
try (Table table = TEST_UTIL.getConnection().getTable(tableName)) {
|
||||
// We have only two handlers. The first thread will get a write lock for row1 and occupy
|
||||
// the first handler. The second thread need a read lock for row1, it should quit after 1000
|
||||
// ms and give back the handler because it can not get the lock in time.
|
||||
// So we can get the value using the second handler.
|
||||
table.setRpcTimeout(1000);
|
||||
table.get(new Get(ROW2)); // Will throw exception if the timeout checking is failed
|
||||
} finally {
|
||||
incrementThread.interrupt();
|
||||
getThread.interrupt();
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue