diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java index b3e1adb1574..7813bf4191b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java @@ -1341,7 +1341,7 @@ public abstract class RpcServer implements RpcServerInterface, * call. * @return An RpcCallContext backed by the currently ongoing call (gotten from a thread local) */ - public static RpcCallContext getCurrentCall() { + public static RpcCall getCurrentCall() { return CurCall.get(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index c3db58869ba..8f64979a6a1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -123,6 +123,7 @@ import org.apache.hadoop.hbase.coprocessor.RegionObserver.MutationType; import org.apache.hadoop.hbase.errorhandling.ForeignExceptionSnare; import org.apache.hadoop.hbase.exceptions.FailedSanityCheckException; import org.apache.hadoop.hbase.exceptions.RegionInRecoveryException; +import org.apache.hadoop.hbase.exceptions.TimeoutIOException; import org.apache.hadoop.hbase.exceptions.UnknownProtocolException; import org.apache.hadoop.hbase.filter.ByteArrayComparable; import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; @@ -134,6 +135,7 @@ import org.apache.hadoop.hbase.io.TimeRange; import org.apache.hadoop.hbase.io.hfile.HFile; import org.apache.hadoop.hbase.ipc.CallerDisconnectedException; import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils; +import org.apache.hadoop.hbase.ipc.RpcCall; import org.apache.hadoop.hbase.ipc.RpcCallContext; import org.apache.hadoop.hbase.ipc.RpcServer; import org.apache.hadoop.hbase.monitoring.MonitoredTask; @@ -3166,6 +3168,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi RowLock rowLock = null; try { rowLock = getRowLockInternal(mutation.getRow(), true); + } catch (TimeoutIOException e) { + // We will retry when other exceptions, but we should stop if we timeout . + throw e; } catch (IOException ioe) { LOG.warn("Failed getting lock, row=" + Bytes.toStringBinary(mutation.getRow()), ioe); } @@ -5351,15 +5356,33 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi result = rowLockContext.newWriteLock(); } } - if (!result.getLock().tryLock(this.rowLockWaitDuration, TimeUnit.MILLISECONDS)) { + + int timeout = rowLockWaitDuration; + boolean reachDeadlineFirst = false; + RpcCall call = RpcServer.getCurrentCall(); + if (call != null && call.getDeadline() < Long.MAX_VALUE) { + int timeToDeadline = (int)(call.getDeadline() - System.currentTimeMillis()); + if (timeToDeadline <= this.rowLockWaitDuration) { + reachDeadlineFirst = true; + timeout = timeToDeadline; + } + } + + if (timeout <= 0 || !result.getLock().tryLock(timeout, TimeUnit.MILLISECONDS)) { if (traceScope != null) { traceScope.getSpan().addTimelineAnnotation("Failed to get row lock"); } result = null; // Clean up the counts just in case this was the thing keeping the context alive. rowLockContext.cleanUp(); - throw new IOException("Timed out waiting for lock for row: " + rowKey + " in region " - + getRegionInfo().getEncodedName()); + String message = "Timed out waiting for lock for row: " + rowKey + " in region " + + getRegionInfo().getEncodedName(); + if (reachDeadlineFirst) { + throw new TimeoutIOException(message); + } else { + // If timeToDeadline is larger than rowLockWaitDuration, we can not drop the request. + throw new IOException(message); + } } rowLockContext.setThreadName(Thread.currentThread().getName()); return result; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSettingTimeoutOnBlockingPoint.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSettingTimeoutOnBlockingPoint.java new file mode 100644 index 00000000000..ab59ed01fb8 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSettingTimeoutOnBlockingPoint.java @@ -0,0 +1,126 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import java.io.IOException; + +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Increment; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver; +import org.apache.hadoop.hbase.coprocessor.ObserverContext; +import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; +import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Threads; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; + +@Category({LargeTests.class}) +public class TestSettingTimeoutOnBlockingPoint { + + private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + private static final byte[] FAM = Bytes.toBytes("f"); + private static final byte[] ROW1 = Bytes.toBytes("row1"); + private static final byte[] ROW2 = Bytes.toBytes("row2"); + + @Rule + public TestName testName = new TestName(); + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + TEST_UTIL.getConfiguration().setBoolean(HConstants.STATUS_PUBLISHED, true); + TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1); + // simulate queue blocking + TEST_UTIL.getConfiguration().setInt(HConstants.REGION_SERVER_HANDLER_COUNT, 2); + TEST_UTIL.startMiniCluster(2); + } + + @AfterClass + public static void setUpAfterClass() throws Exception { + TEST_UTIL.shutdownMiniCluster(); + } + + public static class SleepCoprocessor extends BaseRegionObserver { + public static final int SLEEP_TIME = 10000; + + @Override + public Result preIncrementAfterRowLock(final ObserverContext e, + final Increment increment) throws IOException { + Threads.sleep(SLEEP_TIME); + return super.preIncrementAfterRowLock(e, increment); + } + } + + @Test + public void testRowLock() throws IOException { + TableName tableName = TableName.valueOf(testName.getMethodName()); + HTableDescriptor hdt = TEST_UTIL.createTableDescriptor(tableName); + hdt.addCoprocessor(SleepCoprocessor.class.getName()); + TEST_UTIL.createTable(hdt, new byte[][]{FAM}, TEST_UTIL.getConfiguration()); + + Thread incrementThread = new Thread(() -> { + try { + try( Table table = TEST_UTIL.getConnection().getTable(tableName)) { + table.incrementColumnValue(ROW1, FAM, FAM, 1); + } + } catch (IOException e) { + Assert.fail(e.getMessage()); + } + }); + Thread getThread = new Thread(() -> { + try { + try( Table table = TEST_UTIL.getConnection().getTable(tableName)) { + table.setRpcTimeout(1000); + Delete delete = new Delete(ROW1); + table.delete(delete); + } + } catch (IOException e) { + Assert.fail(e.getMessage()); + } + }); + + incrementThread.start(); + Threads.sleep(1000); + getThread.start(); + Threads.sleep(2000); + try (Table table = TEST_UTIL.getConnection().getTable(tableName)) { + // We have only two handlers. The first thread will get a write lock for row1 and occupy + // the first handler. The second thread need a read lock for row1, it should quit after 1000 + // ms and give back the handler because it can not get the lock in time. + // So we can get the value using the second handler. + table.setRpcTimeout(1000); + table.get(new Get(ROW2)); // Will throw exception if the timeout checking is failed + } finally { + incrementThread.interrupt(); + getThread.interrupt(); + } + } +}