diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java index 62ee0abee08..51b89a9ec4d 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java @@ -251,8 +251,8 @@ class AsyncBatchRpcRetryingCaller { @SuppressWarnings("unchecked") private void onComplete(Action action, RegionRequest regionReq, int tries, ServerName serverName, - RegionResult regionResult, List failedActions) { - Object result = regionResult.result.get(action.getOriginalIndex()); + RegionResult regionResult, List failedActions, Throwable regionException) { + Object result = regionResult.result.getOrDefault(action.getOriginalIndex(), regionException); if (result == null) { LOG.error("Server " + serverName + " sent us neither result nor exception for row '" + Bytes.toStringBinary(action.getAction().getRow()) + "' of " @@ -279,27 +279,28 @@ class AsyncBatchRpcRetryingCaller { List failedActions = new ArrayList<>(); actionsByRegion.forEach((rn, regionReq) -> { RegionResult regionResult = resp.getResults().get(rn); + Throwable regionException = resp.getException(rn); if (regionResult != null) { regionReq.actions.forEach( - action -> onComplete(action, regionReq, tries, serverName, regionResult, failedActions)); + action -> onComplete(action, regionReq, tries, serverName, regionResult, failedActions, + regionException)); } else { - Throwable t = resp.getException(rn); Throwable error; - if (t == null) { + if (regionException == null) { LOG.error( "Server sent us neither results nor exceptions for " + Bytes.toStringBinary(rn)); error = new RuntimeException("Invalid response"); } else { - error = translateException(t); - logException(tries, () -> Stream.of(regionReq), error, serverName); - conn.getLocator().updateCachedLocation(regionReq.loc, error); - if (error instanceof DoNotRetryIOException || tries >= maxAttempts) { - failAll(regionReq.actions.stream(), tries, error, serverName); - return; - } - addError(regionReq.actions, error, serverName); - failedActions.addAll(regionReq.actions); + error = translateException(regionException); } + logException(tries, () -> Stream.of(regionReq), error, serverName); + conn.getLocator().updateCachedLocation(regionReq.loc, error); + if (error instanceof DoNotRetryIOException || tries >= maxAttempts) { + failAll(regionReq.actions.stream(), tries, error, serverName); + return; + } + addError(regionReq.actions, error, serverName); + failedActions.addAll(regionReq.actions); } }); if (!failedActions.isEmpty()) { diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRequestFutureImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRequestFutureImpl.java index 3ab94c50499..ace74f97ea0 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRequestFutureImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRequestFutureImpl.java @@ -19,8 +19,6 @@ package org.apache.hadoop.hbase.client; -import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; - import java.io.IOException; import java.io.InterruptedIOException; import java.util.ArrayList; @@ -36,28 +34,29 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; - import org.apache.hadoop.hbase.CallQueueTooBigException; import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.RegionLocations; import org.apache.hadoop.hbase.RetryImmediatelyException; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.trace.TraceUtil; -import org.apache.yetus.audience.InterfaceAudience; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.apache.hadoop.hbase.client.backoff.ServerStatistics; import org.apache.hadoop.hbase.client.coprocessor.Batch; import org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil; -import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; -import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; +import org.apache.hadoop.hbase.trace.TraceUtil; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.htrace.core.Tracer; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; + +import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; /** * The context, and return value, for a single submit/submitAll call. @@ -152,7 +151,7 @@ class AsyncRequestFutureImpl implements AsyncRequestFuture { if (results[index] != null) return; // We set the number of calls here. After that any path must call setResult/setError. // True even for replicas that are not found - if we refuse to send we MUST set error. - results[index] = new ReplicaResultState(locs.length); + updateResult(index, new ReplicaResultState(locs.length)); } for (int i = 1; i < locs.length; ++i) { Action replicaAction = new Action(action, i); @@ -234,7 +233,7 @@ class AsyncRequestFutureImpl implements AsyncRequestFuture { } else { if (results != null) { SingleResponse singleResponse = (SingleResponse) res; - results[0] = singleResponse.getEntry(); + updateResult(0, singleResponse.getEntry()); } decActionCounter(1); } @@ -706,27 +705,17 @@ class AsyncRequestFutureImpl implements AsyncRequestFuture { Retry canRetry = errorsByServer.canTryMore(numAttempt) ? Retry.YES : Retry.NO_RETRIES_EXHAUSTED; - if (tableName == null && ClientExceptionsUtil.isMetaClearingException(t)) { - // tableName is null when we made a cross-table RPC call. - asyncProcess.connection.clearCaches(server); - } - int failed = 0, stopped = 0; + cleanServerCache(server, t); + int failed = 0; + int stopped = 0; List toReplay = new ArrayList<>(); for (Map.Entry> e : rsActions.actions.entrySet()) { byte[] regionName = e.getKey(); - byte[] row = e.getValue().iterator().next().getAction().getRow(); + byte[] row = e.getValue().get(0).getAction().getRow(); // Do not use the exception for updating cache because it might be coming from // any of the regions in the MultiAction. - try { - if (tableName != null) { - asyncProcess.connection.updateCachedLocations(tableName, regionName, row, - ClientExceptionsUtil.isMetaClearingException(t) ? null : t, server); - } - } catch (Throwable ex) { - // That should never happen, but if it did, we want to make sure - // we still process errors - LOG.error("Couldn't update cached region locations: " + ex); - } + updateCachedLocations(server, regionName, row, + ClientExceptionsUtil.isMetaClearingException(t) ? null : t); for (Action action : e.getValue()) { Retry retry = manageError( action.getOriginalIndex(), action.getAction(), canRetry, t, server); @@ -819,6 +808,9 @@ class AsyncRequestFutureImpl implements AsyncRequestFuture { ServerName server, MultiResponse responses, int numAttempt) { assert responses != null; + Map results = responses.getResults(); + updateStats(server, results); + // Success or partial success // Analyze detailed results. We can still have individual failures to be redo. // two specific throwables are managed: @@ -826,126 +818,111 @@ class AsyncRequestFutureImpl implements AsyncRequestFuture { // - RegionMovedException: we update the cache with the new region location List toReplay = new ArrayList<>(); - Throwable throwable = null; + Throwable lastException = null; int failureCount = 0; - boolean canRetry = true; - - Map results = responses.getResults(); - updateStats(server, results); - - int failed = 0, stopped = 0; + int failed = 0; + int stopped = 0; + Retry retry = null; // Go by original action. for (Map.Entry> regionEntry : multiAction.actions.entrySet()) { byte[] regionName = regionEntry.getKey(); - Map regionResults = results.get(regionName) == null - ? null : results.get(regionName).result; - if (regionResults == null) { - if (!responses.getExceptions().containsKey(regionName)) { - LOG.error("Server sent us neither results nor exceptions for " - + Bytes.toStringBinary(regionName)); - responses.getExceptions().put(regionName, new RuntimeException("Invalid response")); - } - continue; - } + + Throwable regionException = responses.getExceptions().get(regionName); + cleanServerCache(server, regionException); + + Map regionResults = + results.containsKey(regionName) ? results.get(regionName).result : Collections.emptyMap(); boolean regionFailureRegistered = false; for (Action sentAction : regionEntry.getValue()) { Object result = regionResults.get(sentAction.getOriginalIndex()); + if (result == null) { + if (regionException == null) { + LOG.error("Server sent us neither results nor exceptions for " + + Bytes.toStringBinary(regionName) + + ", numAttempt:" + numAttempt); + regionException = new RuntimeException("Invalid response"); + } + // If the row operation encounters the region-lever error, the exception of action may be + // null. + result = regionException; + } // Failure: retry if it's make sense else update the errors lists - if (result == null || result instanceof Throwable) { + if (result instanceof Throwable) { + Throwable actionException = (Throwable) result; Row row = sentAction.getAction(); - throwable = ClientExceptionsUtil.findException(result); + lastException = regionException != null ? regionException + : ClientExceptionsUtil.findException(actionException); // Register corresponding failures once per server/once per region. if (!regionFailureRegistered) { regionFailureRegistered = true; - try { - asyncProcess.connection.updateCachedLocations( - tableName, regionName, row.getRow(), result, server); - } catch (Throwable ex) { - // That should never happen, but if it did, we want to make sure - // we still process errors - LOG.error("Couldn't update cached region locations: " + ex); - } + updateCachedLocations(server, regionName, row.getRow(), actionException); } - if (failureCount == 0) { + if (retry == null) { errorsByServer.reportServerError(server); // We determine canRetry only once for all calls, after reporting server failure. - canRetry = errorsByServer.canTryMore(numAttempt); + retry = errorsByServer.canTryMore(numAttempt) ? + Retry.YES : Retry.NO_RETRIES_EXHAUSTED; } ++failureCount; - Retry retry = manageError(sentAction.getOriginalIndex(), row, - canRetry ? Retry.YES : Retry.NO_RETRIES_EXHAUSTED, (Throwable) result, server); - if (retry == Retry.YES) { - toReplay.add(sentAction); - } else if (retry == Retry.NO_OTHER_SUCCEEDED) { - ++stopped; - } else { - ++failed; + switch (manageError(sentAction.getOriginalIndex(), row, retry, actionException, + server)) { + case YES: + toReplay.add(sentAction); + break; + case NO_OTHER_SUCCEEDED: + ++stopped; + break; + default: + ++failed; + break; } } else { - if (callback != null) { - try { - //noinspection unchecked - // TODO: would callback expect a replica region name if it gets one? - this.callback.update(regionName, sentAction.getAction().getRow(), (CResult) result); - } catch (Throwable t) { - LOG.error("User callback threw an exception for " - + Bytes.toStringBinary(regionName) + ", ignoring", t); - } - } + invokeCallBack(regionName, sentAction.getAction().getRow(), (CResult) result); setResult(sentAction, result); } } } + if (toReplay.isEmpty()) { + logNoResubmit(server, numAttempt, failureCount, lastException, failed, stopped); + } else { + resubmit(server, toReplay, numAttempt, failureCount, lastException); + } + } - // The failures global to a region. We will use for multiAction we sent previously to find the - // actions to replay. - for (Map.Entry throwableEntry : responses.getExceptions().entrySet()) { - throwable = throwableEntry.getValue(); - byte[] region = throwableEntry.getKey(); - List actions = multiAction.actions.get(region); - if (actions == null || actions.isEmpty()) { - throw new IllegalStateException("Wrong response for the region: " + - HRegionInfo.encodeRegionName(region)); - } + private void updateCachedLocations(ServerName server, byte[] regionName, byte[] row, + Throwable rowException) { + if (tableName == null) { + return; + } + try { + asyncProcess.connection + .updateCachedLocations(tableName, regionName, row, rowException, server); + } catch (Throwable ex) { + // That should never happen, but if it did, we want to make sure + // we still process errors + LOG.error("Couldn't update cached region locations: " + ex); + } + } - if (failureCount == 0) { - errorsByServer.reportServerError(server); - canRetry = errorsByServer.canTryMore(numAttempt); - } - if (null == tableName && ClientExceptionsUtil.isMetaClearingException(throwable)) { - // For multi-actions, we don't have a table name, but we want to make sure to clear the - // cache in case there were location-related exceptions. We don't to clear the cache - // for every possible exception that comes through, however. - asyncProcess.connection.clearCaches(server); - } else { - try { - asyncProcess.connection.updateCachedLocations( - tableName, region, actions.get(0).getAction().getRow(), throwable, server); - } catch (Throwable ex) { - // That should never happen, but if it did, we want to make sure - // we still process errors - LOG.error("Couldn't update cached region locations: " + ex); - } - } - failureCount += actions.size(); - - for (Action action : actions) { - Row row = action.getAction(); - Retry retry = manageError(action.getOriginalIndex(), row, - canRetry ? Retry.YES : Retry.NO_RETRIES_EXHAUSTED, throwable, server); - if (retry == Retry.YES) { - toReplay.add(action); - } else if (retry == Retry.NO_OTHER_SUCCEEDED) { - ++stopped; - } else { - ++failed; - } + private void invokeCallBack(byte[] regionName, byte[] row, CResult result) { + if (callback != null) { + try { + //noinspection unchecked + // TODO: would callback expect a replica region name if it gets one? + this.callback.update(regionName, row, result); + } catch (Throwable t) { + LOG.error("User callback threw an exception for " + + Bytes.toStringBinary(regionName) + ", ignoring", t); } } - if (toReplay.isEmpty()) { - logNoResubmit(server, numAttempt, failureCount, throwable, failed, stopped); - } else { - resubmit(server, toReplay, numAttempt, failureCount, throwable); + } + + private void cleanServerCache(ServerName server, Throwable regionException) { + if (tableName == null && ClientExceptionsUtil.isMetaClearingException(regionException)) { + // For multi-actions, we don't have a table name, but we want to make sure to clear the + // cache in case there were location-related exceptions. We don't to clear the cache + // for every possible exception that comes through, however. + asyncProcess.connection.clearCaches(server); } } @@ -1041,7 +1018,7 @@ class AsyncRequestFutureImpl implements AsyncRequestFuture { if (results[index] != state) { throw new AssertionError("We set the callCount but someone else replaced the result"); } - results[index] = result; + updateResult(index, result); } decActionCounter(index); @@ -1099,7 +1076,7 @@ class AsyncRequestFutureImpl implements AsyncRequestFuture { if (results[index] != state) { throw new AssertionError("We set the callCount but someone else replaced the result"); } - results[index] = throwable; + updateResult(index, throwable); } decActionCounter(index); } @@ -1130,7 +1107,7 @@ class AsyncRequestFutureImpl implements AsyncRequestFuture { if (isFromReplica) { throw new AssertionError("Unexpected stale result for " + row); } - results[index] = result; + updateResult(index, result); } else { synchronized (replicaResultLock) { resObj = results[index]; @@ -1138,7 +1115,7 @@ class AsyncRequestFutureImpl implements AsyncRequestFuture { if (isFromReplica) { throw new AssertionError("Unexpected stale result for " + row); } - results[index] = result; + updateResult(index, result); } } } @@ -1276,4 +1253,20 @@ class AsyncRequestFutureImpl implements AsyncRequestFuture { return new MultiServerCallable(asyncProcess.connection, tableName, server, multi, asyncProcess.rpcFactory.newController(), rpcTimeout, tracker, multi.getPriority()); } + + private void updateResult(int index, Object result) { + Object current = results[index]; + if (current != null) { + if (LOG.isDebugEnabled()) { + LOG.debug("The result is assigned repeatedly! current:" + current + + ", new:" + result); + } + } + results[index] = result; + } + + @VisibleForTesting + long getNumberOfActionsInProgress() { + return actionsInProgress.get(); + } } diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcessWithRegionException.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcessWithRegionException.java new file mode 100644 index 00000000000..c46385e5a4d --- /dev/null +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcessWithRegionException.java @@ -0,0 +1,252 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.io.InterruptedIOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionLocation; +import org.apache.hadoop.hbase.RegionLocations; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.ipc.RpcControllerFactory; +import org.apache.hadoop.hbase.testclassification.ClientTests; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.mockito.Mockito; + +/** + * The purpose of this test is to make sure the region exception won't corrupt the results + * of batch. The prescription is shown below. + * 1) honor the action result rather than region exception. If the action have both of true result + * and region exception, the action is fine as the exception is caused by other actions + * which are in the same region. + * 2) honor the action exception rather than region exception. If the action have both of action + * exception and region exception, we deal with the action exception only. If we also + * handle the region exception for the same action, it will introduce the negative count of + * actions in progress. The AsyncRequestFuture#waitUntilDone will block forever. + * + * This bug can be reproduced by real use case. see TestMalformedCellFromClient(in branch-1.4+). + * It uses the batch of RowMutations to present the bug. Given that the batch of RowMutations is + * only supported by branch-1.4+, perhaps the branch-1.3 and branch-1.2 won't encounter this issue. + * We still backport the fix to branch-1.3 and branch-1.2 in case we ignore some write paths. + */ +@Category({ ClientTests.class, SmallTests.class }) +public class TestAsyncProcessWithRegionException { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestAsyncProcessWithRegionException.class); + + private static final Result EMPTY_RESULT = Result.create(null, true); + private static final IOException IOE = new IOException("YOU CAN'T PASS"); + private static final Configuration CONF = new Configuration(); + private static final TableName DUMMY_TABLE = TableName.valueOf("DUMMY_TABLE"); + private static final byte[] GOOD_ROW = Bytes.toBytes("GOOD_ROW"); + private static final byte[] BAD_ROW = Bytes.toBytes("BAD_ROW"); + private static final byte[] BAD_ROW_WITHOUT_ACTION_EXCEPTION = + Bytes.toBytes("BAD_ROW_WITHOUT_ACTION_EXCEPTION"); + private static final byte[] FAMILY = Bytes.toBytes("FAMILY"); + private static final ServerName SERVER_NAME = ServerName.valueOf("s1,1,1"); + private static final RegionInfo REGION_INFO = + RegionInfoBuilder.newBuilder(DUMMY_TABLE) + .setStartKey(HConstants.EMPTY_START_ROW) + .setEndKey(HConstants.EMPTY_END_ROW) + .setSplit(false) + .setRegionId(1) + .build(); + + private static final HRegionLocation REGION_LOCATION = + new HRegionLocation(REGION_INFO, SERVER_NAME); + + @BeforeClass + public static void setUpBeforeClass() { + // disable the retry + CONF.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 0); + } + + @Test + public void testSuccessivePut() throws Exception { + MyAsyncProcess ap = new MyAsyncProcess(createHConnection(), CONF); + + List puts = new ArrayList<>(1); + puts.add(new Put(GOOD_ROW).addColumn(FAMILY, FAMILY, FAMILY)); + final int expectedSize = puts.size(); + AsyncRequestFuture arf = ap.submit(DUMMY_TABLE, puts); + arf.waitUntilDone(); + Object[] result = arf.getResults(); + assertEquals(expectedSize, result.length); + for (Object r : result) { + assertEquals(Result.class, r.getClass()); + } + assertTrue(puts.isEmpty()); + assertActionsInProgress(arf); + } + + @Test + public void testFailedPut() throws Exception { + MyAsyncProcess ap = new MyAsyncProcess(createHConnection(), CONF); + + List puts = new ArrayList<>(2); + puts.add(new Put(GOOD_ROW).addColumn(FAMILY, FAMILY, FAMILY)); + // this put should fail + puts.add(new Put(BAD_ROW).addColumn(FAMILY, FAMILY, FAMILY)); + final int expectedSize = puts.size(); + + AsyncRequestFuture arf = ap.submit(DUMMY_TABLE, puts); + arf.waitUntilDone(); + // There is a failed puts + assertError(arf, 1); + Object[] result = arf.getResults(); + assertEquals(expectedSize, result.length); + assertEquals(Result.class, result[0].getClass()); + assertTrue(result[1] instanceof IOException); + assertTrue(puts.isEmpty()); + assertActionsInProgress(arf); + } + + @Test + public void testFailedPutWithoutActionException() throws Exception { + MyAsyncProcess ap = new MyAsyncProcess(createHConnection(), CONF); + + List puts = new ArrayList<>(3); + puts.add(new Put(GOOD_ROW).addColumn(FAMILY, FAMILY, FAMILY)); + // this put should fail + puts.add(new Put(BAD_ROW).addColumn(FAMILY, FAMILY, FAMILY)); + // this put should fail, and it won't have action exception + puts.add(new Put(BAD_ROW_WITHOUT_ACTION_EXCEPTION).addColumn(FAMILY, FAMILY, FAMILY)); + final int expectedSize = puts.size(); + + AsyncRequestFuture arf = ap.submit(DUMMY_TABLE, puts); + arf.waitUntilDone(); + // There are two failed puts + assertError(arf, 2); + Object[] result = arf.getResults(); + assertEquals(expectedSize, result.length); + assertEquals(Result.class, result[0].getClass()); + assertTrue(result[1] instanceof IOException); + assertTrue(result[2] instanceof IOException); + assertTrue(puts.isEmpty()); + assertActionsInProgress(arf); + } + + private static void assertError(AsyncRequestFuture arf, int expectedCountOfFailure) { + assertTrue(arf.hasError()); + RetriesExhaustedWithDetailsException e = arf.getErrors(); + List errors = e.getCauses(); + assertEquals(expectedCountOfFailure, errors.size()); + for (Throwable t : errors) { + assertTrue(t instanceof IOException); + } + } + + private static void assertActionsInProgress(AsyncRequestFuture arf) { + if (arf instanceof AsyncRequestFutureImpl) { + assertEquals(0, ((AsyncRequestFutureImpl) arf).getNumberOfActionsInProgress()); + } + } + + private static ClusterConnection createHConnection() throws IOException { + ClusterConnection hc = Mockito.mock(ClusterConnection.class); + NonceGenerator ng = Mockito.mock(NonceGenerator.class); + Mockito.when(ng.getNonceGroup()).thenReturn(HConstants.NO_NONCE); + Mockito.when(hc.getNonceGenerator()).thenReturn(ng); + Mockito.when(hc.getConfiguration()).thenReturn(CONF); + Mockito.when(hc.getConnectionConfiguration()).thenReturn(new ConnectionConfiguration(CONF)); + setMockLocation(hc, GOOD_ROW, new RegionLocations(REGION_LOCATION)); + setMockLocation(hc, BAD_ROW, new RegionLocations(REGION_LOCATION)); + Mockito + .when(hc.locateRegions(Mockito.eq(DUMMY_TABLE), Mockito.anyBoolean(), Mockito.anyBoolean())) + .thenReturn(Collections.singletonList(REGION_LOCATION)); + return hc; + } + + private static void setMockLocation(ClusterConnection hc, byte[] row, RegionLocations result) + throws IOException { + Mockito.when(hc.locateRegion(Mockito.eq(DUMMY_TABLE), Mockito.eq(row), Mockito.anyBoolean(), + Mockito.anyBoolean(), Mockito.anyInt())).thenReturn(result); + Mockito.when(hc.locateRegion(Mockito.eq(DUMMY_TABLE), Mockito.eq(row), Mockito.anyBoolean(), + Mockito.anyBoolean())).thenReturn(result); + } + + private static class MyAsyncProcess extends AsyncProcess { + private final ExecutorService service = Executors.newFixedThreadPool(5); + + MyAsyncProcess(ClusterConnection hc, Configuration conf) { + super(hc, conf, new RpcRetryingCallerFactory(conf), false, new RpcControllerFactory(conf)); + } + + public AsyncRequestFuture submit(TableName tableName, List rows) + throws InterruptedIOException { + return submit(AsyncProcessTask.newBuilder() + .setPool(service) + .setTableName(tableName) + .setRowAccess(rows) + .setSubmittedRows(AsyncProcessTask.SubmittedRows.NORMAL) + .setNeedResults(true) + .setRpcTimeout(HConstants.DEFAULT_HBASE_RPC_TIMEOUT) + .setOperationTimeout(HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT) + .build()); + } + + @Override + protected RpcRetryingCaller createCaller( + CancellableRegionServerCallable callable, int rpcTimeout) { + MultiServerCallable callable1 = (MultiServerCallable) callable; + MultiResponse mr = new MultiResponse(); + callable1.getMulti().actions.forEach((regionName, actions) -> { + actions.forEach(action -> { + if (Bytes.equals(action.getAction().getRow(), GOOD_ROW)) { + mr.add(regionName, action.getOriginalIndex(), EMPTY_RESULT); + } else if (Bytes.equals(action.getAction().getRow(), BAD_ROW)) { + mr.add(regionName, action.getOriginalIndex(), IOE); + } + }); + }); + mr.addException(REGION_INFO.getRegionName(), IOE); + return new RpcRetryingCallerImpl(100, 500, 0, 9) { + @Override + public AbstractResponse callWithoutRetries(RetryingCallable callable, + int callTimeout) { + try { + // sleep one second in order for threadpool to start another thread instead of reusing + // existing one. + Thread.sleep(1000); + } catch (InterruptedException e) { + // pass + } + return mr; + } + }; + } + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMalformedCellFromClient.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMalformedCellFromClient.java new file mode 100644 index 00000000000..e44a2e91d98 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMalformedCellFromClient.java @@ -0,0 +1,173 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.testclassification.ClientTests; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +/** + * The purpose of this test is to make sure the region exception won't corrupt the results + * of batch. The prescription is shown below. + * 1) honor the action result rather than region exception. If the action have both of true result + * and region exception, the action is fine as the exception is caused by other actions + * which are in the same region. + * 2) honor the action exception rather than region exception. If the action have both of action + * exception and region exception, we deal with the action exception only. If we also + * handle the region exception for the same action, it will introduce the negative count of + * actions in progress. The AsyncRequestFuture#waitUntilDone will block forever. + * + * The no-cluster test is in TestAsyncProcessWithRegionException. + */ +@Category({ MediumTests.class, ClientTests.class }) +public class TestMalformedCellFromClient { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestMalformedCellFromClient.class); + + private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + private static final byte[] FAMILY = Bytes.toBytes("testFamily"); + private static final int CELL_SIZE = 100; + private static final TableName TABLE_NAME = TableName.valueOf("TestMalformedCellFromClient"); + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + // disable the retry + TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 0); + TEST_UTIL.startMiniCluster(1); + } + + @Before + public void before() throws Exception { + TableDescriptor desc = TableDescriptorBuilder.newBuilder(TABLE_NAME) + .addColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY)) + .setValue(HRegion.HBASE_MAX_CELL_SIZE_KEY, String.valueOf(CELL_SIZE)).build(); + TEST_UTIL.getConnection().getAdmin().createTable(desc); + } + + @After + public void tearDown() throws Exception { + for (TableDescriptor htd : TEST_UTIL.getAdmin().listTableDescriptors()) { + TEST_UTIL.deleteTable(htd.getTableName()); + } + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + TEST_UTIL.shutdownMiniCluster(); + } + + /** + * The purpose of this ut is to check the consistency between the exception and results. + * If the RetriesExhaustedWithDetailsException contains the whole batch, + * each result should be of IOE. Otherwise, the row operation which is not in the exception + * should have a true result. + */ + @Test + public void testRegionException() throws InterruptedException, IOException { + List batches = new ArrayList<>(); + batches.add(new Put(Bytes.toBytes("good")).addColumn(FAMILY, null, new byte[10])); + // the rm is used to prompt the region exception. + // see RSRpcServices#multi + RowMutations rm = new RowMutations(Bytes.toBytes("fail")); + rm.add(new Put(rm.getRow()).addColumn(FAMILY, null, new byte[CELL_SIZE])); + batches.add(rm); + Object[] results = new Object[batches.size()]; + + try (Table table = TEST_UTIL.getConnection().getTable(TABLE_NAME)) { + Throwable exceptionByCaught = null; + try { + table.batch(batches, results); + fail("Where is the exception? We put the malformed cells!!!"); + } catch (RetriesExhaustedWithDetailsException e) { + for (Throwable throwable : e.getCauses()) { + assertNotNull(throwable); + } + assertEquals(1, e.getNumExceptions()); + exceptionByCaught = e.getCause(0); + } + for (Object obj : results) { + assertNotNull(obj); + } + assertEquals(Result.class, results[0].getClass()); + assertEquals(exceptionByCaught.getClass(), results[1].getClass()); + Result result = table.get(new Get(Bytes.toBytes("good"))); + assertEquals(1, result.size()); + Cell cell = result.getColumnLatestCell(FAMILY, null); + assertTrue(Bytes.equals(CellUtil.cloneValue(cell), new byte[10])); + } + } + + /** + * The purpose of this ut is to check the consistency between the exception and results. + * If the RetriesExhaustedWithDetailsException contains the whole batch, + * each result should be of IOE. Otherwise, the row operation which is not in the exception + * should have a true result. + */ + @Test + public void testRegionExceptionByAsync() throws Exception { + List batches = new ArrayList<>(); + batches.add(new Put(Bytes.toBytes("good")).addColumn(FAMILY, null, new byte[10])); + // the rm is used to prompt the region exception. + // see RSRpcServices#multi + RowMutations rm = new RowMutations(Bytes.toBytes("fail")); + rm.add(new Put(rm.getRow()).addColumn(FAMILY, null, new byte[CELL_SIZE])); + batches.add(rm); + try (AsyncConnection asyncConnection = ConnectionFactory + .createAsyncConnection(TEST_UTIL.getConfiguration()).get()) { + AsyncTable table = asyncConnection.getTable(TABLE_NAME); + List> results = table.batch(batches); + assertEquals(2, results.size()); + try { + results.get(1).get(); + fail("Where is the exception? We put the malformed cells!!!"); + } catch (ExecutionException e) { + // pass + } + Result result = table.get(new Get(Bytes.toBytes("good"))).get(); + assertEquals(1, result.size()); + Cell cell = result.getColumnLatestCell(FAMILY, null); + assertTrue(Bytes.equals(CellUtil.cloneValue(cell), new byte[10])); + } + } +}