From cbdbe6572b7813e756bca6dcf0a4b7ec3238233a Mon Sep 17 00:00:00 2001 From: Duo Zhang Date: Fri, 1 Mar 2019 18:56:48 +0800 Subject: [PATCH] HBASE-21976 Deal with RetryImmediatelyException for batching request Signed-off-by: Guanghao Zhang --- .../client/AsyncBatchRpcRetryingCaller.java | 30 ++++-- .../TestAsyncTableBatchRetryImmediately.java | 101 ++++++++++++++++++ 2 files changed, 123 insertions(+), 8 deletions(-) create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableBatchRetryImmediately.java diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java index b62ba6cd0ae..f9bcf74888f 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java @@ -44,10 +44,12 @@ import java.util.concurrent.TimeUnit; import java.util.function.Supplier; import java.util.stream.Collectors; import java.util.stream.Stream; +import org.apache.commons.lang3.mutable.MutableBoolean; import org.apache.hadoop.hbase.CellScannable; import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionLocation; +import org.apache.hadoop.hbase.RetryImmediatelyException; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.MultiResponse.RegionResult; @@ -267,7 +269,8 @@ class AsyncBatchRpcRetryingCaller { @SuppressWarnings("unchecked") private void onComplete(Action action, RegionRequest regionReq, int tries, ServerName serverName, - RegionResult regionResult, List failedActions, Throwable regionException) { + RegionResult regionResult, List failedActions, Throwable regionException, + MutableBoolean retryImmediately) { Object result = regionResult.result.getOrDefault(action.getOriginalIndex(), regionException); if (result == null) { LOG.error("Server " + serverName + " sent us neither result nor exception for row '" + @@ -283,6 +286,9 @@ class AsyncBatchRpcRetryingCaller { failOne(action, tries, error, EnvironmentEdgeManager.currentTime(), getExtraContextForError(serverName)); } else { + if (!retryImmediately.booleanValue() && error instanceof RetryImmediatelyException) { + retryImmediately.setTrue(); + } failedActions.add(action); } } else { @@ -293,17 +299,18 @@ class AsyncBatchRpcRetryingCaller { private void onComplete(Map actionsByRegion, int tries, ServerName serverName, MultiResponse resp) { List failedActions = new ArrayList<>(); + MutableBoolean retryImmediately = new MutableBoolean(false); actionsByRegion.forEach((rn, regionReq) -> { RegionResult regionResult = resp.getResults().get(rn); Throwable regionException = resp.getException(rn); if (regionResult != null) { regionReq.actions.forEach(action -> onComplete(action, regionReq, tries, serverName, - regionResult, failedActions, regionException)); + regionResult, failedActions, regionException, retryImmediately)); } else { Throwable error; if (regionException == null) { - LOG - .error("Server sent us neither results nor exceptions for " + Bytes.toStringBinary(rn)); + LOG.error("Server sent us neither results nor exceptions for {}", + Bytes.toStringBinary(rn)); error = new RuntimeException("Invalid response"); } else { error = translateException(regionException); @@ -314,12 +321,15 @@ class AsyncBatchRpcRetryingCaller { failAll(regionReq.actions.stream(), tries, error, serverName); return; } + if (!retryImmediately.booleanValue() && error instanceof RetryImmediatelyException) { + retryImmediately.setTrue(); + } addError(regionReq.actions, error, serverName); failedActions.addAll(regionReq.actions); } }); if (!failedActions.isEmpty()) { - tryResubmit(failedActions.stream(), tries); + tryResubmit(failedActions.stream(), tries, retryImmediately.booleanValue()); } } @@ -391,10 +401,14 @@ class AsyncBatchRpcRetryingCaller { List copiedActions = actionsByRegion.values().stream().flatMap(r -> r.actions.stream()) .collect(Collectors.toList()); addError(copiedActions, error, serverName); - tryResubmit(copiedActions.stream(), tries); + tryResubmit(copiedActions.stream(), tries, error instanceof RetryImmediatelyException); } - private void tryResubmit(Stream actions, int tries) { + private void tryResubmit(Stream actions, int tries, boolean immediately) { + if (immediately) { + groupAndSend(actions, tries); + return; + } long delayNs; if (operationTimeoutNs > 0) { long maxDelayNs = remainingTimeNs() - SLEEP_DELTA_NS; @@ -443,7 +457,7 @@ class AsyncBatchRpcRetryingCaller { send(actionsByServer, tries); } if (!locateFailed.isEmpty()) { - tryResubmit(locateFailed.stream(), tries); + tryResubmit(locateFailed.stream(), tries, false); } }); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableBatchRetryImmediately.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableBatchRetryImmediately.java new file mode 100644 index 00000000000..57cfbeca6e2 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableBatchRetryImmediately.java @@ -0,0 +1,101 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; + +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.ThreadLocalRandom; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.testclassification.ClientTests; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.log4j.Level; +import org.apache.log4j.LogManager; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({ MediumTests.class, ClientTests.class }) +public class TestAsyncTableBatchRetryImmediately { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestAsyncTableBatchRetryImmediately.class); + + private static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); + + private static TableName TABLE_NAME = TableName.valueOf("async"); + + private static byte[] FAMILY = Bytes.toBytes("cf"); + + private static byte[] QUAL = Bytes.toBytes("cq"); + + private static byte[] VALUE_PREFIX = new byte[768]; + + private static int COUNT = 1000; + + private static AsyncConnection CONN; + + @BeforeClass + public static void setUp() throws Exception { + // disable the debug log to avoid flooding the output + LogManager.getLogger(AsyncRegionLocatorHelper.class).setLevel(Level.INFO); + UTIL.getConfiguration().setLong(HConstants.HBASE_SERVER_SCANNER_MAX_RESULT_SIZE_KEY, 1024); + UTIL.startMiniCluster(1); + Table table = UTIL.createTable(TABLE_NAME, FAMILY); + UTIL.waitTableAvailable(TABLE_NAME); + ThreadLocalRandom.current().nextBytes(VALUE_PREFIX); + for (int i = 0; i < COUNT; i++) { + table.put(new Put(Bytes.toBytes(i)).addColumn(FAMILY, QUAL, + Bytes.add(VALUE_PREFIX, Bytes.toBytes(i)))); + } + CONN = ConnectionFactory.createAsyncConnection(UTIL.getConfiguration()).get(); + } + + @AfterClass + public static void tearDown() throws Exception { + CONN.close(); + UTIL.shutdownMiniCluster(); + } + + @Test + public void test() { + AsyncTable table = CONN.getTable(TABLE_NAME); + // if we do not deal with RetryImmediatelyException, we will timeout here since we need to retry + // hundreds times. + List gets = IntStream.range(0, COUNT).mapToObj(i -> new Get(Bytes.toBytes(i))) + .collect(Collectors.toList()); + List results = table.getAll(gets).join(); + for (int i = 0; i < COUNT; i++) { + byte[] value = results.get(i).getValue(FAMILY, QUAL); + assertEquals(VALUE_PREFIX.length + 4, value.length); + assertArrayEquals(VALUE_PREFIX, Arrays.copyOf(value, VALUE_PREFIX.length)); + assertEquals(i, Bytes.toInt(value, VALUE_PREFIX.length)); + } + } +}