HBASE-19900 Region-level exception destroy the result of batch
This commit is contained in:
parent
f481386e3e
commit
e2636c81f7
|
@ -251,8 +251,8 @@ class AsyncBatchRpcRetryingCaller<T> {
|
|||
|
||||
@SuppressWarnings("unchecked")
|
||||
private void onComplete(Action action, RegionRequest regionReq, int tries, ServerName serverName,
|
||||
RegionResult regionResult, List<Action> failedActions) {
|
||||
Object result = regionResult.result.get(action.getOriginalIndex());
|
||||
RegionResult regionResult, List<Action> failedActions, Throwable regionException) {
|
||||
Object result = regionResult.result.getOrDefault(action.getOriginalIndex(), regionException);
|
||||
if (result == null) {
|
||||
LOG.error("Server " + serverName + " sent us neither result nor exception for row '"
|
||||
+ Bytes.toStringBinary(action.getAction().getRow()) + "' of "
|
||||
|
@ -279,27 +279,28 @@ class AsyncBatchRpcRetryingCaller<T> {
|
|||
List<Action> failedActions = new ArrayList<>();
|
||||
actionsByRegion.forEach((rn, regionReq) -> {
|
||||
RegionResult regionResult = resp.getResults().get(rn);
|
||||
Throwable regionException = resp.getException(rn);
|
||||
if (regionResult != null) {
|
||||
regionReq.actions.forEach(
|
||||
action -> onComplete(action, regionReq, tries, serverName, regionResult, failedActions));
|
||||
action -> onComplete(action, regionReq, tries, serverName, regionResult, failedActions,
|
||||
regionException));
|
||||
} else {
|
||||
Throwable t = resp.getException(rn);
|
||||
Throwable error;
|
||||
if (t == null) {
|
||||
if (regionException == null) {
|
||||
LOG.error(
|
||||
"Server sent us neither results nor exceptions for " + Bytes.toStringBinary(rn));
|
||||
error = new RuntimeException("Invalid response");
|
||||
} else {
|
||||
error = translateException(t);
|
||||
logException(tries, () -> Stream.of(regionReq), error, serverName);
|
||||
conn.getLocator().updateCachedLocation(regionReq.loc, error);
|
||||
if (error instanceof DoNotRetryIOException || tries >= maxAttempts) {
|
||||
failAll(regionReq.actions.stream(), tries, error, serverName);
|
||||
return;
|
||||
}
|
||||
addError(regionReq.actions, error, serverName);
|
||||
failedActions.addAll(regionReq.actions);
|
||||
error = translateException(regionException);
|
||||
}
|
||||
logException(tries, () -> Stream.of(regionReq), error, serverName);
|
||||
conn.getLocator().updateCachedLocation(regionReq.loc, error);
|
||||
if (error instanceof DoNotRetryIOException || tries >= maxAttempts) {
|
||||
failAll(regionReq.actions.stream(), tries, error, serverName);
|
||||
return;
|
||||
}
|
||||
addError(regionReq.actions, error, serverName);
|
||||
failedActions.addAll(regionReq.actions);
|
||||
}
|
||||
});
|
||||
if (!failedActions.isEmpty()) {
|
||||
|
|
|
@ -19,8 +19,6 @@
|
|||
|
||||
package org.apache.hadoop.hbase.client;
|
||||
|
||||
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InterruptedIOException;
|
||||
import java.util.ArrayList;
|
||||
|
@ -36,28 +34,29 @@ import java.util.concurrent.ExecutorService;
|
|||
import java.util.concurrent.RejectedExecutionException;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
import org.apache.hadoop.hbase.CallQueueTooBigException;
|
||||
import org.apache.hadoop.hbase.DoNotRetryIOException;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
import org.apache.hadoop.hbase.HRegionLocation;
|
||||
import org.apache.hadoop.hbase.RegionLocations;
|
||||
import org.apache.hadoop.hbase.RetryImmediatelyException;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.trace.TraceUtil;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.apache.hadoop.hbase.client.backoff.ServerStatistics;
|
||||
import org.apache.hadoop.hbase.client.coprocessor.Batch;
|
||||
import org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
|
||||
import org.apache.hadoop.hbase.trace.TraceUtil;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||
import org.apache.htrace.core.Tracer;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
|
||||
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
|
||||
|
||||
/**
|
||||
* The context, and return value, for a single submit/submitAll call.
|
||||
|
@ -152,7 +151,7 @@ class AsyncRequestFutureImpl<CResult> implements AsyncRequestFuture {
|
|||
if (results[index] != null) return;
|
||||
// We set the number of calls here. After that any path must call setResult/setError.
|
||||
// True even for replicas that are not found - if we refuse to send we MUST set error.
|
||||
results[index] = new ReplicaResultState(locs.length);
|
||||
updateResult(index, new ReplicaResultState(locs.length));
|
||||
}
|
||||
for (int i = 1; i < locs.length; ++i) {
|
||||
Action replicaAction = new Action(action, i);
|
||||
|
@ -234,7 +233,7 @@ class AsyncRequestFutureImpl<CResult> implements AsyncRequestFuture {
|
|||
} else {
|
||||
if (results != null) {
|
||||
SingleResponse singleResponse = (SingleResponse) res;
|
||||
results[0] = singleResponse.getEntry();
|
||||
updateResult(0, singleResponse.getEntry());
|
||||
}
|
||||
decActionCounter(1);
|
||||
}
|
||||
|
@ -706,27 +705,17 @@ class AsyncRequestFutureImpl<CResult> implements AsyncRequestFuture {
|
|||
Retry canRetry = errorsByServer.canTryMore(numAttempt)
|
||||
? Retry.YES : Retry.NO_RETRIES_EXHAUSTED;
|
||||
|
||||
if (tableName == null && ClientExceptionsUtil.isMetaClearingException(t)) {
|
||||
// tableName is null when we made a cross-table RPC call.
|
||||
asyncProcess.connection.clearCaches(server);
|
||||
}
|
||||
int failed = 0, stopped = 0;
|
||||
cleanServerCache(server, t);
|
||||
int failed = 0;
|
||||
int stopped = 0;
|
||||
List<Action> toReplay = new ArrayList<>();
|
||||
for (Map.Entry<byte[], List<Action>> e : rsActions.actions.entrySet()) {
|
||||
byte[] regionName = e.getKey();
|
||||
byte[] row = e.getValue().iterator().next().getAction().getRow();
|
||||
byte[] row = e.getValue().get(0).getAction().getRow();
|
||||
// Do not use the exception for updating cache because it might be coming from
|
||||
// any of the regions in the MultiAction.
|
||||
try {
|
||||
if (tableName != null) {
|
||||
asyncProcess.connection.updateCachedLocations(tableName, regionName, row,
|
||||
ClientExceptionsUtil.isMetaClearingException(t) ? null : t, server);
|
||||
}
|
||||
} catch (Throwable ex) {
|
||||
// That should never happen, but if it did, we want to make sure
|
||||
// we still process errors
|
||||
LOG.error("Couldn't update cached region locations: " + ex);
|
||||
}
|
||||
updateCachedLocations(server, regionName, row,
|
||||
ClientExceptionsUtil.isMetaClearingException(t) ? null : t);
|
||||
for (Action action : e.getValue()) {
|
||||
Retry retry = manageError(
|
||||
action.getOriginalIndex(), action.getAction(), canRetry, t, server);
|
||||
|
@ -819,6 +808,9 @@ class AsyncRequestFutureImpl<CResult> implements AsyncRequestFuture {
|
|||
ServerName server, MultiResponse responses, int numAttempt) {
|
||||
assert responses != null;
|
||||
|
||||
Map<byte[], MultiResponse.RegionResult> results = responses.getResults();
|
||||
updateStats(server, results);
|
||||
|
||||
// Success or partial success
|
||||
// Analyze detailed results. We can still have individual failures to be redo.
|
||||
// two specific throwables are managed:
|
||||
|
@ -826,126 +818,111 @@ class AsyncRequestFutureImpl<CResult> implements AsyncRequestFuture {
|
|||
// - RegionMovedException: we update the cache with the new region location
|
||||
|
||||
List<Action> toReplay = new ArrayList<>();
|
||||
Throwable throwable = null;
|
||||
Throwable lastException = null;
|
||||
int failureCount = 0;
|
||||
boolean canRetry = true;
|
||||
|
||||
Map<byte[], MultiResponse.RegionResult> results = responses.getResults();
|
||||
updateStats(server, results);
|
||||
|
||||
int failed = 0, stopped = 0;
|
||||
int failed = 0;
|
||||
int stopped = 0;
|
||||
Retry retry = null;
|
||||
// Go by original action.
|
||||
for (Map.Entry<byte[], List<Action>> regionEntry : multiAction.actions.entrySet()) {
|
||||
byte[] regionName = regionEntry.getKey();
|
||||
Map<Integer, Object> regionResults = results.get(regionName) == null
|
||||
? null : results.get(regionName).result;
|
||||
if (regionResults == null) {
|
||||
if (!responses.getExceptions().containsKey(regionName)) {
|
||||
LOG.error("Server sent us neither results nor exceptions for "
|
||||
+ Bytes.toStringBinary(regionName));
|
||||
responses.getExceptions().put(regionName, new RuntimeException("Invalid response"));
|
||||
}
|
||||
continue;
|
||||
}
|
||||
|
||||
Throwable regionException = responses.getExceptions().get(regionName);
|
||||
cleanServerCache(server, regionException);
|
||||
|
||||
Map<Integer, Object> regionResults =
|
||||
results.containsKey(regionName) ? results.get(regionName).result : Collections.emptyMap();
|
||||
boolean regionFailureRegistered = false;
|
||||
for (Action sentAction : regionEntry.getValue()) {
|
||||
Object result = regionResults.get(sentAction.getOriginalIndex());
|
||||
if (result == null) {
|
||||
if (regionException == null) {
|
||||
LOG.error("Server sent us neither results nor exceptions for "
|
||||
+ Bytes.toStringBinary(regionName)
|
||||
+ ", numAttempt:" + numAttempt);
|
||||
regionException = new RuntimeException("Invalid response");
|
||||
}
|
||||
// If the row operation encounters the region-lever error, the exception of action may be
|
||||
// null.
|
||||
result = regionException;
|
||||
}
|
||||
// Failure: retry if it's make sense else update the errors lists
|
||||
if (result == null || result instanceof Throwable) {
|
||||
if (result instanceof Throwable) {
|
||||
Throwable actionException = (Throwable) result;
|
||||
Row row = sentAction.getAction();
|
||||
throwable = ClientExceptionsUtil.findException(result);
|
||||
lastException = regionException != null ? regionException
|
||||
: ClientExceptionsUtil.findException(actionException);
|
||||
// Register corresponding failures once per server/once per region.
|
||||
if (!regionFailureRegistered) {
|
||||
regionFailureRegistered = true;
|
||||
try {
|
||||
asyncProcess.connection.updateCachedLocations(
|
||||
tableName, regionName, row.getRow(), result, server);
|
||||
} catch (Throwable ex) {
|
||||
// That should never happen, but if it did, we want to make sure
|
||||
// we still process errors
|
||||
LOG.error("Couldn't update cached region locations: " + ex);
|
||||
}
|
||||
updateCachedLocations(server, regionName, row.getRow(), actionException);
|
||||
}
|
||||
if (failureCount == 0) {
|
||||
if (retry == null) {
|
||||
errorsByServer.reportServerError(server);
|
||||
// We determine canRetry only once for all calls, after reporting server failure.
|
||||
canRetry = errorsByServer.canTryMore(numAttempt);
|
||||
retry = errorsByServer.canTryMore(numAttempt) ?
|
||||
Retry.YES : Retry.NO_RETRIES_EXHAUSTED;
|
||||
}
|
||||
++failureCount;
|
||||
Retry retry = manageError(sentAction.getOriginalIndex(), row,
|
||||
canRetry ? Retry.YES : Retry.NO_RETRIES_EXHAUSTED, (Throwable) result, server);
|
||||
if (retry == Retry.YES) {
|
||||
toReplay.add(sentAction);
|
||||
} else if (retry == Retry.NO_OTHER_SUCCEEDED) {
|
||||
++stopped;
|
||||
} else {
|
||||
++failed;
|
||||
switch (manageError(sentAction.getOriginalIndex(), row, retry, actionException,
|
||||
server)) {
|
||||
case YES:
|
||||
toReplay.add(sentAction);
|
||||
break;
|
||||
case NO_OTHER_SUCCEEDED:
|
||||
++stopped;
|
||||
break;
|
||||
default:
|
||||
++failed;
|
||||
break;
|
||||
}
|
||||
} else {
|
||||
if (callback != null) {
|
||||
try {
|
||||
//noinspection unchecked
|
||||
// TODO: would callback expect a replica region name if it gets one?
|
||||
this.callback.update(regionName, sentAction.getAction().getRow(), (CResult) result);
|
||||
} catch (Throwable t) {
|
||||
LOG.error("User callback threw an exception for "
|
||||
+ Bytes.toStringBinary(regionName) + ", ignoring", t);
|
||||
}
|
||||
}
|
||||
invokeCallBack(regionName, sentAction.getAction().getRow(), (CResult) result);
|
||||
setResult(sentAction, result);
|
||||
}
|
||||
}
|
||||
}
|
||||
if (toReplay.isEmpty()) {
|
||||
logNoResubmit(server, numAttempt, failureCount, lastException, failed, stopped);
|
||||
} else {
|
||||
resubmit(server, toReplay, numAttempt, failureCount, lastException);
|
||||
}
|
||||
}
|
||||
|
||||
// The failures global to a region. We will use for multiAction we sent previously to find the
|
||||
// actions to replay.
|
||||
for (Map.Entry<byte[], Throwable> throwableEntry : responses.getExceptions().entrySet()) {
|
||||
throwable = throwableEntry.getValue();
|
||||
byte[] region = throwableEntry.getKey();
|
||||
List<Action> actions = multiAction.actions.get(region);
|
||||
if (actions == null || actions.isEmpty()) {
|
||||
throw new IllegalStateException("Wrong response for the region: " +
|
||||
HRegionInfo.encodeRegionName(region));
|
||||
}
|
||||
private void updateCachedLocations(ServerName server, byte[] regionName, byte[] row,
|
||||
Throwable rowException) {
|
||||
if (tableName == null) {
|
||||
return;
|
||||
}
|
||||
try {
|
||||
asyncProcess.connection
|
||||
.updateCachedLocations(tableName, regionName, row, rowException, server);
|
||||
} catch (Throwable ex) {
|
||||
// That should never happen, but if it did, we want to make sure
|
||||
// we still process errors
|
||||
LOG.error("Couldn't update cached region locations: " + ex);
|
||||
}
|
||||
}
|
||||
|
||||
if (failureCount == 0) {
|
||||
errorsByServer.reportServerError(server);
|
||||
canRetry = errorsByServer.canTryMore(numAttempt);
|
||||
}
|
||||
if (null == tableName && ClientExceptionsUtil.isMetaClearingException(throwable)) {
|
||||
// For multi-actions, we don't have a table name, but we want to make sure to clear the
|
||||
// cache in case there were location-related exceptions. We don't to clear the cache
|
||||
// for every possible exception that comes through, however.
|
||||
asyncProcess.connection.clearCaches(server);
|
||||
} else {
|
||||
try {
|
||||
asyncProcess.connection.updateCachedLocations(
|
||||
tableName, region, actions.get(0).getAction().getRow(), throwable, server);
|
||||
} catch (Throwable ex) {
|
||||
// That should never happen, but if it did, we want to make sure
|
||||
// we still process errors
|
||||
LOG.error("Couldn't update cached region locations: " + ex);
|
||||
}
|
||||
}
|
||||
failureCount += actions.size();
|
||||
|
||||
for (Action action : actions) {
|
||||
Row row = action.getAction();
|
||||
Retry retry = manageError(action.getOriginalIndex(), row,
|
||||
canRetry ? Retry.YES : Retry.NO_RETRIES_EXHAUSTED, throwable, server);
|
||||
if (retry == Retry.YES) {
|
||||
toReplay.add(action);
|
||||
} else if (retry == Retry.NO_OTHER_SUCCEEDED) {
|
||||
++stopped;
|
||||
} else {
|
||||
++failed;
|
||||
}
|
||||
private void invokeCallBack(byte[] regionName, byte[] row, CResult result) {
|
||||
if (callback != null) {
|
||||
try {
|
||||
//noinspection unchecked
|
||||
// TODO: would callback expect a replica region name if it gets one?
|
||||
this.callback.update(regionName, row, result);
|
||||
} catch (Throwable t) {
|
||||
LOG.error("User callback threw an exception for "
|
||||
+ Bytes.toStringBinary(regionName) + ", ignoring", t);
|
||||
}
|
||||
}
|
||||
if (toReplay.isEmpty()) {
|
||||
logNoResubmit(server, numAttempt, failureCount, throwable, failed, stopped);
|
||||
} else {
|
||||
resubmit(server, toReplay, numAttempt, failureCount, throwable);
|
||||
}
|
||||
|
||||
private void cleanServerCache(ServerName server, Throwable regionException) {
|
||||
if (tableName == null && ClientExceptionsUtil.isMetaClearingException(regionException)) {
|
||||
// For multi-actions, we don't have a table name, but we want to make sure to clear the
|
||||
// cache in case there were location-related exceptions. We don't to clear the cache
|
||||
// for every possible exception that comes through, however.
|
||||
asyncProcess.connection.clearCaches(server);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1041,7 +1018,7 @@ class AsyncRequestFutureImpl<CResult> implements AsyncRequestFuture {
|
|||
if (results[index] != state) {
|
||||
throw new AssertionError("We set the callCount but someone else replaced the result");
|
||||
}
|
||||
results[index] = result;
|
||||
updateResult(index, result);
|
||||
}
|
||||
|
||||
decActionCounter(index);
|
||||
|
@ -1099,7 +1076,7 @@ class AsyncRequestFutureImpl<CResult> implements AsyncRequestFuture {
|
|||
if (results[index] != state) {
|
||||
throw new AssertionError("We set the callCount but someone else replaced the result");
|
||||
}
|
||||
results[index] = throwable;
|
||||
updateResult(index, throwable);
|
||||
}
|
||||
decActionCounter(index);
|
||||
}
|
||||
|
@ -1130,7 +1107,7 @@ class AsyncRequestFutureImpl<CResult> implements AsyncRequestFuture {
|
|||
if (isFromReplica) {
|
||||
throw new AssertionError("Unexpected stale result for " + row);
|
||||
}
|
||||
results[index] = result;
|
||||
updateResult(index, result);
|
||||
} else {
|
||||
synchronized (replicaResultLock) {
|
||||
resObj = results[index];
|
||||
|
@ -1138,7 +1115,7 @@ class AsyncRequestFutureImpl<CResult> implements AsyncRequestFuture {
|
|||
if (isFromReplica) {
|
||||
throw new AssertionError("Unexpected stale result for " + row);
|
||||
}
|
||||
results[index] = result;
|
||||
updateResult(index, result);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -1276,4 +1253,20 @@ class AsyncRequestFutureImpl<CResult> implements AsyncRequestFuture {
|
|||
return new MultiServerCallable(asyncProcess.connection, tableName, server,
|
||||
multi, asyncProcess.rpcFactory.newController(), rpcTimeout, tracker, multi.getPriority());
|
||||
}
|
||||
|
||||
private void updateResult(int index, Object result) {
|
||||
Object current = results[index];
|
||||
if (current != null) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("The result is assigned repeatedly! current:" + current
|
||||
+ ", new:" + result);
|
||||
}
|
||||
}
|
||||
results[index] = result;
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
long getNumberOfActionsInProgress() {
|
||||
return actionsInProgress.get();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,252 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.client;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InterruptedIOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.HRegionLocation;
|
||||
import org.apache.hadoop.hbase.RegionLocations;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
|
||||
import org.apache.hadoop.hbase.testclassification.ClientTests;
|
||||
import org.apache.hadoop.hbase.testclassification.SmallTests;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.ClassRule;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
import org.mockito.Mockito;
|
||||
|
||||
/**
|
||||
* The purpose of this test is to make sure the region exception won't corrupt the results
|
||||
* of batch. The prescription is shown below.
|
||||
* 1) honor the action result rather than region exception. If the action have both of true result
|
||||
* and region exception, the action is fine as the exception is caused by other actions
|
||||
* which are in the same region.
|
||||
* 2) honor the action exception rather than region exception. If the action have both of action
|
||||
* exception and region exception, we deal with the action exception only. If we also
|
||||
* handle the region exception for the same action, it will introduce the negative count of
|
||||
* actions in progress. The AsyncRequestFuture#waitUntilDone will block forever.
|
||||
*
|
||||
* This bug can be reproduced by real use case. see TestMalformedCellFromClient(in branch-1.4+).
|
||||
* It uses the batch of RowMutations to present the bug. Given that the batch of RowMutations is
|
||||
* only supported by branch-1.4+, perhaps the branch-1.3 and branch-1.2 won't encounter this issue.
|
||||
* We still backport the fix to branch-1.3 and branch-1.2 in case we ignore some write paths.
|
||||
*/
|
||||
@Category({ ClientTests.class, SmallTests.class })
|
||||
public class TestAsyncProcessWithRegionException {
|
||||
|
||||
@ClassRule
|
||||
public static final HBaseClassTestRule CLASS_RULE =
|
||||
HBaseClassTestRule.forClass(TestAsyncProcessWithRegionException.class);
|
||||
|
||||
private static final Result EMPTY_RESULT = Result.create(null, true);
|
||||
private static final IOException IOE = new IOException("YOU CAN'T PASS");
|
||||
private static final Configuration CONF = new Configuration();
|
||||
private static final TableName DUMMY_TABLE = TableName.valueOf("DUMMY_TABLE");
|
||||
private static final byte[] GOOD_ROW = Bytes.toBytes("GOOD_ROW");
|
||||
private static final byte[] BAD_ROW = Bytes.toBytes("BAD_ROW");
|
||||
private static final byte[] BAD_ROW_WITHOUT_ACTION_EXCEPTION =
|
||||
Bytes.toBytes("BAD_ROW_WITHOUT_ACTION_EXCEPTION");
|
||||
private static final byte[] FAMILY = Bytes.toBytes("FAMILY");
|
||||
private static final ServerName SERVER_NAME = ServerName.valueOf("s1,1,1");
|
||||
private static final RegionInfo REGION_INFO =
|
||||
RegionInfoBuilder.newBuilder(DUMMY_TABLE)
|
||||
.setStartKey(HConstants.EMPTY_START_ROW)
|
||||
.setEndKey(HConstants.EMPTY_END_ROW)
|
||||
.setSplit(false)
|
||||
.setRegionId(1)
|
||||
.build();
|
||||
|
||||
private static final HRegionLocation REGION_LOCATION =
|
||||
new HRegionLocation(REGION_INFO, SERVER_NAME);
|
||||
|
||||
@BeforeClass
|
||||
public static void setUpBeforeClass() {
|
||||
// disable the retry
|
||||
CONF.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 0);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSuccessivePut() throws Exception {
|
||||
MyAsyncProcess ap = new MyAsyncProcess(createHConnection(), CONF);
|
||||
|
||||
List<Put> puts = new ArrayList<>(1);
|
||||
puts.add(new Put(GOOD_ROW).addColumn(FAMILY, FAMILY, FAMILY));
|
||||
final int expectedSize = puts.size();
|
||||
AsyncRequestFuture arf = ap.submit(DUMMY_TABLE, puts);
|
||||
arf.waitUntilDone();
|
||||
Object[] result = arf.getResults();
|
||||
assertEquals(expectedSize, result.length);
|
||||
for (Object r : result) {
|
||||
assertEquals(Result.class, r.getClass());
|
||||
}
|
||||
assertTrue(puts.isEmpty());
|
||||
assertActionsInProgress(arf);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFailedPut() throws Exception {
|
||||
MyAsyncProcess ap = new MyAsyncProcess(createHConnection(), CONF);
|
||||
|
||||
List<Put> puts = new ArrayList<>(2);
|
||||
puts.add(new Put(GOOD_ROW).addColumn(FAMILY, FAMILY, FAMILY));
|
||||
// this put should fail
|
||||
puts.add(new Put(BAD_ROW).addColumn(FAMILY, FAMILY, FAMILY));
|
||||
final int expectedSize = puts.size();
|
||||
|
||||
AsyncRequestFuture arf = ap.submit(DUMMY_TABLE, puts);
|
||||
arf.waitUntilDone();
|
||||
// There is a failed puts
|
||||
assertError(arf, 1);
|
||||
Object[] result = arf.getResults();
|
||||
assertEquals(expectedSize, result.length);
|
||||
assertEquals(Result.class, result[0].getClass());
|
||||
assertTrue(result[1] instanceof IOException);
|
||||
assertTrue(puts.isEmpty());
|
||||
assertActionsInProgress(arf);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFailedPutWithoutActionException() throws Exception {
|
||||
MyAsyncProcess ap = new MyAsyncProcess(createHConnection(), CONF);
|
||||
|
||||
List<Put> puts = new ArrayList<>(3);
|
||||
puts.add(new Put(GOOD_ROW).addColumn(FAMILY, FAMILY, FAMILY));
|
||||
// this put should fail
|
||||
puts.add(new Put(BAD_ROW).addColumn(FAMILY, FAMILY, FAMILY));
|
||||
// this put should fail, and it won't have action exception
|
||||
puts.add(new Put(BAD_ROW_WITHOUT_ACTION_EXCEPTION).addColumn(FAMILY, FAMILY, FAMILY));
|
||||
final int expectedSize = puts.size();
|
||||
|
||||
AsyncRequestFuture arf = ap.submit(DUMMY_TABLE, puts);
|
||||
arf.waitUntilDone();
|
||||
// There are two failed puts
|
||||
assertError(arf, 2);
|
||||
Object[] result = arf.getResults();
|
||||
assertEquals(expectedSize, result.length);
|
||||
assertEquals(Result.class, result[0].getClass());
|
||||
assertTrue(result[1] instanceof IOException);
|
||||
assertTrue(result[2] instanceof IOException);
|
||||
assertTrue(puts.isEmpty());
|
||||
assertActionsInProgress(arf);
|
||||
}
|
||||
|
||||
private static void assertError(AsyncRequestFuture arf, int expectedCountOfFailure) {
|
||||
assertTrue(arf.hasError());
|
||||
RetriesExhaustedWithDetailsException e = arf.getErrors();
|
||||
List<Throwable> errors = e.getCauses();
|
||||
assertEquals(expectedCountOfFailure, errors.size());
|
||||
for (Throwable t : errors) {
|
||||
assertTrue(t instanceof IOException);
|
||||
}
|
||||
}
|
||||
|
||||
private static void assertActionsInProgress(AsyncRequestFuture arf) {
|
||||
if (arf instanceof AsyncRequestFutureImpl) {
|
||||
assertEquals(0, ((AsyncRequestFutureImpl) arf).getNumberOfActionsInProgress());
|
||||
}
|
||||
}
|
||||
|
||||
private static ClusterConnection createHConnection() throws IOException {
|
||||
ClusterConnection hc = Mockito.mock(ClusterConnection.class);
|
||||
NonceGenerator ng = Mockito.mock(NonceGenerator.class);
|
||||
Mockito.when(ng.getNonceGroup()).thenReturn(HConstants.NO_NONCE);
|
||||
Mockito.when(hc.getNonceGenerator()).thenReturn(ng);
|
||||
Mockito.when(hc.getConfiguration()).thenReturn(CONF);
|
||||
Mockito.when(hc.getConnectionConfiguration()).thenReturn(new ConnectionConfiguration(CONF));
|
||||
setMockLocation(hc, GOOD_ROW, new RegionLocations(REGION_LOCATION));
|
||||
setMockLocation(hc, BAD_ROW, new RegionLocations(REGION_LOCATION));
|
||||
Mockito
|
||||
.when(hc.locateRegions(Mockito.eq(DUMMY_TABLE), Mockito.anyBoolean(), Mockito.anyBoolean()))
|
||||
.thenReturn(Collections.singletonList(REGION_LOCATION));
|
||||
return hc;
|
||||
}
|
||||
|
||||
private static void setMockLocation(ClusterConnection hc, byte[] row, RegionLocations result)
|
||||
throws IOException {
|
||||
Mockito.when(hc.locateRegion(Mockito.eq(DUMMY_TABLE), Mockito.eq(row), Mockito.anyBoolean(),
|
||||
Mockito.anyBoolean(), Mockito.anyInt())).thenReturn(result);
|
||||
Mockito.when(hc.locateRegion(Mockito.eq(DUMMY_TABLE), Mockito.eq(row), Mockito.anyBoolean(),
|
||||
Mockito.anyBoolean())).thenReturn(result);
|
||||
}
|
||||
|
||||
private static class MyAsyncProcess extends AsyncProcess {
|
||||
private final ExecutorService service = Executors.newFixedThreadPool(5);
|
||||
|
||||
MyAsyncProcess(ClusterConnection hc, Configuration conf) {
|
||||
super(hc, conf, new RpcRetryingCallerFactory(conf), false, new RpcControllerFactory(conf));
|
||||
}
|
||||
|
||||
public AsyncRequestFuture submit(TableName tableName, List<? extends Row> rows)
|
||||
throws InterruptedIOException {
|
||||
return submit(AsyncProcessTask.newBuilder()
|
||||
.setPool(service)
|
||||
.setTableName(tableName)
|
||||
.setRowAccess(rows)
|
||||
.setSubmittedRows(AsyncProcessTask.SubmittedRows.NORMAL)
|
||||
.setNeedResults(true)
|
||||
.setRpcTimeout(HConstants.DEFAULT_HBASE_RPC_TIMEOUT)
|
||||
.setOperationTimeout(HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT)
|
||||
.build());
|
||||
}
|
||||
|
||||
@Override
|
||||
protected RpcRetryingCaller<AbstractResponse> createCaller(
|
||||
CancellableRegionServerCallable callable, int rpcTimeout) {
|
||||
MultiServerCallable callable1 = (MultiServerCallable) callable;
|
||||
MultiResponse mr = new MultiResponse();
|
||||
callable1.getMulti().actions.forEach((regionName, actions) -> {
|
||||
actions.forEach(action -> {
|
||||
if (Bytes.equals(action.getAction().getRow(), GOOD_ROW)) {
|
||||
mr.add(regionName, action.getOriginalIndex(), EMPTY_RESULT);
|
||||
} else if (Bytes.equals(action.getAction().getRow(), BAD_ROW)) {
|
||||
mr.add(regionName, action.getOriginalIndex(), IOE);
|
||||
}
|
||||
});
|
||||
});
|
||||
mr.addException(REGION_INFO.getRegionName(), IOE);
|
||||
return new RpcRetryingCallerImpl<AbstractResponse>(100, 500, 0, 9) {
|
||||
@Override
|
||||
public AbstractResponse callWithoutRetries(RetryingCallable<AbstractResponse> callable,
|
||||
int callTimeout) {
|
||||
try {
|
||||
// sleep one second in order for threadpool to start another thread instead of reusing
|
||||
// existing one.
|
||||
Thread.sleep(1000);
|
||||
} catch (InterruptedException e) {
|
||||
// pass
|
||||
}
|
||||
return mr;
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,173 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.client;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.fail;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import org.apache.hadoop.hbase.Cell;
|
||||
import org.apache.hadoop.hbase.CellUtil;
|
||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegion;
|
||||
import org.apache.hadoop.hbase.testclassification.ClientTests;
|
||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.junit.After;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.Before;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.ClassRule;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
|
||||
/**
|
||||
* The purpose of this test is to make sure the region exception won't corrupt the results
|
||||
* of batch. The prescription is shown below.
|
||||
* 1) honor the action result rather than region exception. If the action have both of true result
|
||||
* and region exception, the action is fine as the exception is caused by other actions
|
||||
* which are in the same region.
|
||||
* 2) honor the action exception rather than region exception. If the action have both of action
|
||||
* exception and region exception, we deal with the action exception only. If we also
|
||||
* handle the region exception for the same action, it will introduce the negative count of
|
||||
* actions in progress. The AsyncRequestFuture#waitUntilDone will block forever.
|
||||
*
|
||||
* The no-cluster test is in TestAsyncProcessWithRegionException.
|
||||
*/
|
||||
@Category({ MediumTests.class, ClientTests.class })
|
||||
public class TestMalformedCellFromClient {
|
||||
|
||||
@ClassRule
|
||||
public static final HBaseClassTestRule CLASS_RULE =
|
||||
HBaseClassTestRule.forClass(TestMalformedCellFromClient.class);
|
||||
|
||||
private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
|
||||
private static final byte[] FAMILY = Bytes.toBytes("testFamily");
|
||||
private static final int CELL_SIZE = 100;
|
||||
private static final TableName TABLE_NAME = TableName.valueOf("TestMalformedCellFromClient");
|
||||
|
||||
@BeforeClass
|
||||
public static void setUpBeforeClass() throws Exception {
|
||||
// disable the retry
|
||||
TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 0);
|
||||
TEST_UTIL.startMiniCluster(1);
|
||||
}
|
||||
|
||||
@Before
|
||||
public void before() throws Exception {
|
||||
TableDescriptor desc = TableDescriptorBuilder.newBuilder(TABLE_NAME)
|
||||
.addColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY))
|
||||
.setValue(HRegion.HBASE_MAX_CELL_SIZE_KEY, String.valueOf(CELL_SIZE)).build();
|
||||
TEST_UTIL.getConnection().getAdmin().createTable(desc);
|
||||
}
|
||||
|
||||
@After
|
||||
public void tearDown() throws Exception {
|
||||
for (TableDescriptor htd : TEST_UTIL.getAdmin().listTableDescriptors()) {
|
||||
TEST_UTIL.deleteTable(htd.getTableName());
|
||||
}
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void tearDownAfterClass() throws Exception {
|
||||
TEST_UTIL.shutdownMiniCluster();
|
||||
}
|
||||
|
||||
/**
|
||||
* The purpose of this ut is to check the consistency between the exception and results.
|
||||
* If the RetriesExhaustedWithDetailsException contains the whole batch,
|
||||
* each result should be of IOE. Otherwise, the row operation which is not in the exception
|
||||
* should have a true result.
|
||||
*/
|
||||
@Test
|
||||
public void testRegionException() throws InterruptedException, IOException {
|
||||
List<Row> batches = new ArrayList<>();
|
||||
batches.add(new Put(Bytes.toBytes("good")).addColumn(FAMILY, null, new byte[10]));
|
||||
// the rm is used to prompt the region exception.
|
||||
// see RSRpcServices#multi
|
||||
RowMutations rm = new RowMutations(Bytes.toBytes("fail"));
|
||||
rm.add(new Put(rm.getRow()).addColumn(FAMILY, null, new byte[CELL_SIZE]));
|
||||
batches.add(rm);
|
||||
Object[] results = new Object[batches.size()];
|
||||
|
||||
try (Table table = TEST_UTIL.getConnection().getTable(TABLE_NAME)) {
|
||||
Throwable exceptionByCaught = null;
|
||||
try {
|
||||
table.batch(batches, results);
|
||||
fail("Where is the exception? We put the malformed cells!!!");
|
||||
} catch (RetriesExhaustedWithDetailsException e) {
|
||||
for (Throwable throwable : e.getCauses()) {
|
||||
assertNotNull(throwable);
|
||||
}
|
||||
assertEquals(1, e.getNumExceptions());
|
||||
exceptionByCaught = e.getCause(0);
|
||||
}
|
||||
for (Object obj : results) {
|
||||
assertNotNull(obj);
|
||||
}
|
||||
assertEquals(Result.class, results[0].getClass());
|
||||
assertEquals(exceptionByCaught.getClass(), results[1].getClass());
|
||||
Result result = table.get(new Get(Bytes.toBytes("good")));
|
||||
assertEquals(1, result.size());
|
||||
Cell cell = result.getColumnLatestCell(FAMILY, null);
|
||||
assertTrue(Bytes.equals(CellUtil.cloneValue(cell), new byte[10]));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* The purpose of this ut is to check the consistency between the exception and results.
|
||||
* If the RetriesExhaustedWithDetailsException contains the whole batch,
|
||||
* each result should be of IOE. Otherwise, the row operation which is not in the exception
|
||||
* should have a true result.
|
||||
*/
|
||||
@Test
|
||||
public void testRegionExceptionByAsync() throws Exception {
|
||||
List<Row> batches = new ArrayList<>();
|
||||
batches.add(new Put(Bytes.toBytes("good")).addColumn(FAMILY, null, new byte[10]));
|
||||
// the rm is used to prompt the region exception.
|
||||
// see RSRpcServices#multi
|
||||
RowMutations rm = new RowMutations(Bytes.toBytes("fail"));
|
||||
rm.add(new Put(rm.getRow()).addColumn(FAMILY, null, new byte[CELL_SIZE]));
|
||||
batches.add(rm);
|
||||
try (AsyncConnection asyncConnection = ConnectionFactory
|
||||
.createAsyncConnection(TEST_UTIL.getConfiguration()).get()) {
|
||||
AsyncTable<AdvancedScanResultConsumer> table = asyncConnection.getTable(TABLE_NAME);
|
||||
List<CompletableFuture<AdvancedScanResultConsumer>> results = table.batch(batches);
|
||||
assertEquals(2, results.size());
|
||||
try {
|
||||
results.get(1).get();
|
||||
fail("Where is the exception? We put the malformed cells!!!");
|
||||
} catch (ExecutionException e) {
|
||||
// pass
|
||||
}
|
||||
Result result = table.get(new Get(Bytes.toBytes("good"))).get();
|
||||
assertEquals(1, result.size());
|
||||
Cell cell = result.getColumnLatestCell(FAMILY, null);
|
||||
assertTrue(Bytes.equals(CellUtil.cloneValue(cell), new byte[10]));
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue