HBASE-19900 Region-level exception destroy the result of batch

This commit is contained in:
Chia-Ping Tsai 2018-02-06 05:33:37 +08:00
parent a5b86dd77a
commit d8b999e695
4 changed files with 558 additions and 139 deletions

View File

@ -251,8 +251,8 @@ class AsyncBatchRpcRetryingCaller<T> {
@SuppressWarnings("unchecked")
private void onComplete(Action action, RegionRequest regionReq, int tries, ServerName serverName,
RegionResult regionResult, List<Action> failedActions) {
Object result = regionResult.result.get(action.getOriginalIndex());
RegionResult regionResult, List<Action> failedActions, Throwable regionException) {
Object result = regionResult.result.getOrDefault(action.getOriginalIndex(), regionException);
if (result == null) {
LOG.error("Server " + serverName + " sent us neither result nor exception for row '"
+ Bytes.toStringBinary(action.getAction().getRow()) + "' of "
@ -279,27 +279,28 @@ class AsyncBatchRpcRetryingCaller<T> {
List<Action> failedActions = new ArrayList<>();
actionsByRegion.forEach((rn, regionReq) -> {
RegionResult regionResult = resp.getResults().get(rn);
Throwable regionException = resp.getException(rn);
if (regionResult != null) {
regionReq.actions.forEach(
action -> onComplete(action, regionReq, tries, serverName, regionResult, failedActions));
action -> onComplete(action, regionReq, tries, serverName, regionResult, failedActions,
regionException));
} else {
Throwable t = resp.getException(rn);
Throwable error;
if (t == null) {
if (regionException == null) {
LOG.error(
"Server sent us neither results nor exceptions for " + Bytes.toStringBinary(rn));
error = new RuntimeException("Invalid response");
} else {
error = translateException(t);
logException(tries, () -> Stream.of(regionReq), error, serverName);
conn.getLocator().updateCachedLocation(regionReq.loc, error);
if (error instanceof DoNotRetryIOException || tries >= maxAttempts) {
failAll(regionReq.actions.stream(), tries, error, serverName);
return;
}
addError(regionReq.actions, error, serverName);
failedActions.addAll(regionReq.actions);
error = translateException(regionException);
}
logException(tries, () -> Stream.of(regionReq), error, serverName);
conn.getLocator().updateCachedLocation(regionReq.loc, error);
if (error instanceof DoNotRetryIOException || tries >= maxAttempts) {
failAll(regionReq.actions.stream(), tries, error, serverName);
return;
}
addError(regionReq.actions, error, serverName);
failedActions.addAll(regionReq.actions);
}
});
if (!failedActions.isEmpty()) {

View File

@ -19,8 +19,6 @@
package org.apache.hadoop.hbase.client;
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.ArrayList;
@ -36,28 +34,29 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.hbase.CallQueueTooBigException;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.RegionLocations;
import org.apache.hadoop.hbase.RetryImmediatelyException;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.trace.TraceUtil;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.hbase.client.backoff.ServerStatistics;
import org.apache.hadoop.hbase.client.coprocessor.Batch;
import org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
import org.apache.hadoop.hbase.trace.TraceUtil;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.htrace.core.Tracer;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
/**
* The context, and return value, for a single submit/submitAll call.
@ -152,7 +151,7 @@ class AsyncRequestFutureImpl<CResult> implements AsyncRequestFuture {
if (results[index] != null) return;
// We set the number of calls here. After that any path must call setResult/setError.
// True even for replicas that are not found - if we refuse to send we MUST set error.
results[index] = new ReplicaResultState(locs.length);
updateResult(index, new ReplicaResultState(locs.length));
}
for (int i = 1; i < locs.length; ++i) {
Action replicaAction = new Action(action, i);
@ -234,7 +233,7 @@ class AsyncRequestFutureImpl<CResult> implements AsyncRequestFuture {
} else {
if (results != null) {
SingleResponse singleResponse = (SingleResponse) res;
results[0] = singleResponse.getEntry();
updateResult(0, singleResponse.getEntry());
}
decActionCounter(1);
}
@ -706,27 +705,17 @@ class AsyncRequestFutureImpl<CResult> implements AsyncRequestFuture {
Retry canRetry = errorsByServer.canTryMore(numAttempt)
? Retry.YES : Retry.NO_RETRIES_EXHAUSTED;
if (tableName == null && ClientExceptionsUtil.isMetaClearingException(t)) {
// tableName is null when we made a cross-table RPC call.
asyncProcess.connection.clearCaches(server);
}
int failed = 0, stopped = 0;
cleanServerCache(server, t);
int failed = 0;
int stopped = 0;
List<Action> toReplay = new ArrayList<>();
for (Map.Entry<byte[], List<Action>> e : rsActions.actions.entrySet()) {
byte[] regionName = e.getKey();
byte[] row = e.getValue().iterator().next().getAction().getRow();
byte[] row = e.getValue().get(0).getAction().getRow();
// Do not use the exception for updating cache because it might be coming from
// any of the regions in the MultiAction.
try {
if (tableName != null) {
asyncProcess.connection.updateCachedLocations(tableName, regionName, row,
ClientExceptionsUtil.isMetaClearingException(t) ? null : t, server);
}
} catch (Throwable ex) {
// That should never happen, but if it did, we want to make sure
// we still process errors
LOG.error("Couldn't update cached region locations: " + ex);
}
updateCachedLocations(server, regionName, row,
ClientExceptionsUtil.isMetaClearingException(t) ? null : t);
for (Action action : e.getValue()) {
Retry retry = manageError(
action.getOriginalIndex(), action.getAction(), canRetry, t, server);
@ -819,6 +808,9 @@ class AsyncRequestFutureImpl<CResult> implements AsyncRequestFuture {
ServerName server, MultiResponse responses, int numAttempt) {
assert responses != null;
Map<byte[], MultiResponse.RegionResult> results = responses.getResults();
updateStats(server, results);
// Success or partial success
// Analyze detailed results. We can still have individual failures to be redo.
// two specific throwables are managed:
@ -826,126 +818,111 @@ class AsyncRequestFutureImpl<CResult> implements AsyncRequestFuture {
// - RegionMovedException: we update the cache with the new region location
List<Action> toReplay = new ArrayList<>();
Throwable throwable = null;
Throwable lastException = null;
int failureCount = 0;
boolean canRetry = true;
Map<byte[], MultiResponse.RegionResult> results = responses.getResults();
updateStats(server, results);
int failed = 0, stopped = 0;
int failed = 0;
int stopped = 0;
Retry retry = null;
// Go by original action.
for (Map.Entry<byte[], List<Action>> regionEntry : multiAction.actions.entrySet()) {
byte[] regionName = regionEntry.getKey();
Map<Integer, Object> regionResults = results.get(regionName) == null
? null : results.get(regionName).result;
if (regionResults == null) {
if (!responses.getExceptions().containsKey(regionName)) {
LOG.error("Server sent us neither results nor exceptions for "
+ Bytes.toStringBinary(regionName));
responses.getExceptions().put(regionName, new RuntimeException("Invalid response"));
}
continue;
}
Throwable regionException = responses.getExceptions().get(regionName);
cleanServerCache(server, regionException);
Map<Integer, Object> regionResults =
results.containsKey(regionName) ? results.get(regionName).result : Collections.emptyMap();
boolean regionFailureRegistered = false;
for (Action sentAction : regionEntry.getValue()) {
Object result = regionResults.get(sentAction.getOriginalIndex());
if (result == null) {
if (regionException == null) {
LOG.error("Server sent us neither results nor exceptions for "
+ Bytes.toStringBinary(regionName)
+ ", numAttempt:" + numAttempt);
regionException = new RuntimeException("Invalid response");
}
// If the row operation encounters the region-lever error, the exception of action may be
// null.
result = regionException;
}
// Failure: retry if it's make sense else update the errors lists
if (result == null || result instanceof Throwable) {
if (result instanceof Throwable) {
Throwable actionException = (Throwable) result;
Row row = sentAction.getAction();
throwable = ClientExceptionsUtil.findException(result);
lastException = regionException != null ? regionException
: ClientExceptionsUtil.findException(actionException);
// Register corresponding failures once per server/once per region.
if (!regionFailureRegistered) {
regionFailureRegistered = true;
try {
asyncProcess.connection.updateCachedLocations(
tableName, regionName, row.getRow(), result, server);
} catch (Throwable ex) {
// That should never happen, but if it did, we want to make sure
// we still process errors
LOG.error("Couldn't update cached region locations: " + ex);
}
updateCachedLocations(server, regionName, row.getRow(), actionException);
}
if (failureCount == 0) {
if (retry == null) {
errorsByServer.reportServerError(server);
// We determine canRetry only once for all calls, after reporting server failure.
canRetry = errorsByServer.canTryMore(numAttempt);
retry = errorsByServer.canTryMore(numAttempt) ?
Retry.YES : Retry.NO_RETRIES_EXHAUSTED;
}
++failureCount;
Retry retry = manageError(sentAction.getOriginalIndex(), row,
canRetry ? Retry.YES : Retry.NO_RETRIES_EXHAUSTED, (Throwable) result, server);
if (retry == Retry.YES) {
toReplay.add(sentAction);
} else if (retry == Retry.NO_OTHER_SUCCEEDED) {
++stopped;
} else {
++failed;
switch (manageError(sentAction.getOriginalIndex(), row, retry, actionException,
server)) {
case YES:
toReplay.add(sentAction);
break;
case NO_OTHER_SUCCEEDED:
++stopped;
break;
default:
++failed;
break;
}
} else {
if (callback != null) {
try {
//noinspection unchecked
// TODO: would callback expect a replica region name if it gets one?
this.callback.update(regionName, sentAction.getAction().getRow(), (CResult) result);
} catch (Throwable t) {
LOG.error("User callback threw an exception for "
+ Bytes.toStringBinary(regionName) + ", ignoring", t);
}
}
invokeCallBack(regionName, sentAction.getAction().getRow(), (CResult) result);
setResult(sentAction, result);
}
}
}
if (toReplay.isEmpty()) {
logNoResubmit(server, numAttempt, failureCount, lastException, failed, stopped);
} else {
resubmit(server, toReplay, numAttempt, failureCount, lastException);
}
}
// The failures global to a region. We will use for multiAction we sent previously to find the
// actions to replay.
for (Map.Entry<byte[], Throwable> throwableEntry : responses.getExceptions().entrySet()) {
throwable = throwableEntry.getValue();
byte[] region = throwableEntry.getKey();
List<Action> actions = multiAction.actions.get(region);
if (actions == null || actions.isEmpty()) {
throw new IllegalStateException("Wrong response for the region: " +
HRegionInfo.encodeRegionName(region));
}
private void updateCachedLocations(ServerName server, byte[] regionName, byte[] row,
Throwable rowException) {
if (tableName == null) {
return;
}
try {
asyncProcess.connection
.updateCachedLocations(tableName, regionName, row, rowException, server);
} catch (Throwable ex) {
// That should never happen, but if it did, we want to make sure
// we still process errors
LOG.error("Couldn't update cached region locations: " + ex);
}
}
if (failureCount == 0) {
errorsByServer.reportServerError(server);
canRetry = errorsByServer.canTryMore(numAttempt);
}
if (null == tableName && ClientExceptionsUtil.isMetaClearingException(throwable)) {
// For multi-actions, we don't have a table name, but we want to make sure to clear the
// cache in case there were location-related exceptions. We don't to clear the cache
// for every possible exception that comes through, however.
asyncProcess.connection.clearCaches(server);
} else {
try {
asyncProcess.connection.updateCachedLocations(
tableName, region, actions.get(0).getAction().getRow(), throwable, server);
} catch (Throwable ex) {
// That should never happen, but if it did, we want to make sure
// we still process errors
LOG.error("Couldn't update cached region locations: " + ex);
}
}
failureCount += actions.size();
for (Action action : actions) {
Row row = action.getAction();
Retry retry = manageError(action.getOriginalIndex(), row,
canRetry ? Retry.YES : Retry.NO_RETRIES_EXHAUSTED, throwable, server);
if (retry == Retry.YES) {
toReplay.add(action);
} else if (retry == Retry.NO_OTHER_SUCCEEDED) {
++stopped;
} else {
++failed;
}
private void invokeCallBack(byte[] regionName, byte[] row, CResult result) {
if (callback != null) {
try {
//noinspection unchecked
// TODO: would callback expect a replica region name if it gets one?
this.callback.update(regionName, row, result);
} catch (Throwable t) {
LOG.error("User callback threw an exception for "
+ Bytes.toStringBinary(regionName) + ", ignoring", t);
}
}
if (toReplay.isEmpty()) {
logNoResubmit(server, numAttempt, failureCount, throwable, failed, stopped);
} else {
resubmit(server, toReplay, numAttempt, failureCount, throwable);
}
private void cleanServerCache(ServerName server, Throwable regionException) {
if (tableName == null && ClientExceptionsUtil.isMetaClearingException(regionException)) {
// For multi-actions, we don't have a table name, but we want to make sure to clear the
// cache in case there were location-related exceptions. We don't to clear the cache
// for every possible exception that comes through, however.
asyncProcess.connection.clearCaches(server);
}
}
@ -1041,7 +1018,7 @@ class AsyncRequestFutureImpl<CResult> implements AsyncRequestFuture {
if (results[index] != state) {
throw new AssertionError("We set the callCount but someone else replaced the result");
}
results[index] = result;
updateResult(index, result);
}
decActionCounter(index);
@ -1099,7 +1076,7 @@ class AsyncRequestFutureImpl<CResult> implements AsyncRequestFuture {
if (results[index] != state) {
throw new AssertionError("We set the callCount but someone else replaced the result");
}
results[index] = throwable;
updateResult(index, throwable);
}
decActionCounter(index);
}
@ -1130,7 +1107,7 @@ class AsyncRequestFutureImpl<CResult> implements AsyncRequestFuture {
if (isFromReplica) {
throw new AssertionError("Unexpected stale result for " + row);
}
results[index] = result;
updateResult(index, result);
} else {
synchronized (replicaResultLock) {
resObj = results[index];
@ -1138,7 +1115,7 @@ class AsyncRequestFutureImpl<CResult> implements AsyncRequestFuture {
if (isFromReplica) {
throw new AssertionError("Unexpected stale result for " + row);
}
results[index] = result;
updateResult(index, result);
}
}
}
@ -1276,4 +1253,20 @@ class AsyncRequestFutureImpl<CResult> implements AsyncRequestFuture {
return new MultiServerCallable(asyncProcess.connection, tableName, server,
multi, asyncProcess.rpcFactory.newController(), rpcTimeout, tracker, multi.getPriority());
}
private void updateResult(int index, Object result) {
Object current = results[index];
if (current != null) {
if (LOG.isDebugEnabled()) {
LOG.debug("The result is assigned repeatedly! current:" + current
+ ", new:" + result);
}
}
results[index] = result;
}
@VisibleForTesting
long getNumberOfActionsInProgress() {
return actionsInProgress.get();
}
}

View File

@ -0,0 +1,252 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.RegionLocations;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.mockito.Mockito;
/**
* The purpose of this test is to make sure the region exception won't corrupt the results
* of batch. The prescription is shown below.
* 1) honor the action result rather than region exception. If the action have both of true result
* and region exception, the action is fine as the exception is caused by other actions
* which are in the same region.
* 2) honor the action exception rather than region exception. If the action have both of action
* exception and region exception, we deal with the action exception only. If we also
* handle the region exception for the same action, it will introduce the negative count of
* actions in progress. The AsyncRequestFuture#waitUntilDone will block forever.
*
* This bug can be reproduced by real use case. see TestMalformedCellFromClient(in branch-1.4+).
* It uses the batch of RowMutations to present the bug. Given that the batch of RowMutations is
* only supported by branch-1.4+, perhaps the branch-1.3 and branch-1.2 won't encounter this issue.
* We still backport the fix to branch-1.3 and branch-1.2 in case we ignore some write paths.
*/
@Category({ ClientTests.class, SmallTests.class })
public class TestAsyncProcessWithRegionException {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestAsyncProcessWithRegionException.class);
private static final Result EMPTY_RESULT = Result.create(null, true);
private static final IOException IOE = new IOException("YOU CAN'T PASS");
private static final Configuration CONF = new Configuration();
private static final TableName DUMMY_TABLE = TableName.valueOf("DUMMY_TABLE");
private static final byte[] GOOD_ROW = Bytes.toBytes("GOOD_ROW");
private static final byte[] BAD_ROW = Bytes.toBytes("BAD_ROW");
private static final byte[] BAD_ROW_WITHOUT_ACTION_EXCEPTION =
Bytes.toBytes("BAD_ROW_WITHOUT_ACTION_EXCEPTION");
private static final byte[] FAMILY = Bytes.toBytes("FAMILY");
private static final ServerName SERVER_NAME = ServerName.valueOf("s1,1,1");
private static final RegionInfo REGION_INFO =
RegionInfoBuilder.newBuilder(DUMMY_TABLE)
.setStartKey(HConstants.EMPTY_START_ROW)
.setEndKey(HConstants.EMPTY_END_ROW)
.setSplit(false)
.setRegionId(1)
.build();
private static final HRegionLocation REGION_LOCATION =
new HRegionLocation(REGION_INFO, SERVER_NAME);
@BeforeClass
public static void setUpBeforeClass() {
// disable the retry
CONF.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 0);
}
@Test
public void testSuccessivePut() throws Exception {
MyAsyncProcess ap = new MyAsyncProcess(createHConnection(), CONF);
List<Put> puts = new ArrayList<>(1);
puts.add(new Put(GOOD_ROW).addColumn(FAMILY, FAMILY, FAMILY));
final int expectedSize = puts.size();
AsyncRequestFuture arf = ap.submit(DUMMY_TABLE, puts);
arf.waitUntilDone();
Object[] result = arf.getResults();
assertEquals(expectedSize, result.length);
for (Object r : result) {
assertEquals(Result.class, r.getClass());
}
assertTrue(puts.isEmpty());
assertActionsInProgress(arf);
}
@Test
public void testFailedPut() throws Exception {
MyAsyncProcess ap = new MyAsyncProcess(createHConnection(), CONF);
List<Put> puts = new ArrayList<>(2);
puts.add(new Put(GOOD_ROW).addColumn(FAMILY, FAMILY, FAMILY));
// this put should fail
puts.add(new Put(BAD_ROW).addColumn(FAMILY, FAMILY, FAMILY));
final int expectedSize = puts.size();
AsyncRequestFuture arf = ap.submit(DUMMY_TABLE, puts);
arf.waitUntilDone();
// There is a failed puts
assertError(arf, 1);
Object[] result = arf.getResults();
assertEquals(expectedSize, result.length);
assertEquals(Result.class, result[0].getClass());
assertTrue(result[1] instanceof IOException);
assertTrue(puts.isEmpty());
assertActionsInProgress(arf);
}
@Test
public void testFailedPutWithoutActionException() throws Exception {
MyAsyncProcess ap = new MyAsyncProcess(createHConnection(), CONF);
List<Put> puts = new ArrayList<>(3);
puts.add(new Put(GOOD_ROW).addColumn(FAMILY, FAMILY, FAMILY));
// this put should fail
puts.add(new Put(BAD_ROW).addColumn(FAMILY, FAMILY, FAMILY));
// this put should fail, and it won't have action exception
puts.add(new Put(BAD_ROW_WITHOUT_ACTION_EXCEPTION).addColumn(FAMILY, FAMILY, FAMILY));
final int expectedSize = puts.size();
AsyncRequestFuture arf = ap.submit(DUMMY_TABLE, puts);
arf.waitUntilDone();
// There are two failed puts
assertError(arf, 2);
Object[] result = arf.getResults();
assertEquals(expectedSize, result.length);
assertEquals(Result.class, result[0].getClass());
assertTrue(result[1] instanceof IOException);
assertTrue(result[2] instanceof IOException);
assertTrue(puts.isEmpty());
assertActionsInProgress(arf);
}
private static void assertError(AsyncRequestFuture arf, int expectedCountOfFailure) {
assertTrue(arf.hasError());
RetriesExhaustedWithDetailsException e = arf.getErrors();
List<Throwable> errors = e.getCauses();
assertEquals(expectedCountOfFailure, errors.size());
for (Throwable t : errors) {
assertTrue(t instanceof IOException);
}
}
private static void assertActionsInProgress(AsyncRequestFuture arf) {
if (arf instanceof AsyncRequestFutureImpl) {
assertEquals(0, ((AsyncRequestFutureImpl) arf).getNumberOfActionsInProgress());
}
}
private static ClusterConnection createHConnection() throws IOException {
ClusterConnection hc = Mockito.mock(ClusterConnection.class);
NonceGenerator ng = Mockito.mock(NonceGenerator.class);
Mockito.when(ng.getNonceGroup()).thenReturn(HConstants.NO_NONCE);
Mockito.when(hc.getNonceGenerator()).thenReturn(ng);
Mockito.when(hc.getConfiguration()).thenReturn(CONF);
Mockito.when(hc.getConnectionConfiguration()).thenReturn(new ConnectionConfiguration(CONF));
setMockLocation(hc, GOOD_ROW, new RegionLocations(REGION_LOCATION));
setMockLocation(hc, BAD_ROW, new RegionLocations(REGION_LOCATION));
Mockito
.when(hc.locateRegions(Mockito.eq(DUMMY_TABLE), Mockito.anyBoolean(), Mockito.anyBoolean()))
.thenReturn(Collections.singletonList(REGION_LOCATION));
return hc;
}
private static void setMockLocation(ClusterConnection hc, byte[] row, RegionLocations result)
throws IOException {
Mockito.when(hc.locateRegion(Mockito.eq(DUMMY_TABLE), Mockito.eq(row), Mockito.anyBoolean(),
Mockito.anyBoolean(), Mockito.anyInt())).thenReturn(result);
Mockito.when(hc.locateRegion(Mockito.eq(DUMMY_TABLE), Mockito.eq(row), Mockito.anyBoolean(),
Mockito.anyBoolean())).thenReturn(result);
}
private static class MyAsyncProcess extends AsyncProcess {
private final ExecutorService service = Executors.newFixedThreadPool(5);
MyAsyncProcess(ClusterConnection hc, Configuration conf) {
super(hc, conf, new RpcRetryingCallerFactory(conf), false, new RpcControllerFactory(conf));
}
public AsyncRequestFuture submit(TableName tableName, List<? extends Row> rows)
throws InterruptedIOException {
return submit(AsyncProcessTask.newBuilder()
.setPool(service)
.setTableName(tableName)
.setRowAccess(rows)
.setSubmittedRows(AsyncProcessTask.SubmittedRows.NORMAL)
.setNeedResults(true)
.setRpcTimeout(HConstants.DEFAULT_HBASE_RPC_TIMEOUT)
.setOperationTimeout(HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT)
.build());
}
@Override
protected RpcRetryingCaller<AbstractResponse> createCaller(
CancellableRegionServerCallable callable, int rpcTimeout) {
MultiServerCallable callable1 = (MultiServerCallable) callable;
MultiResponse mr = new MultiResponse();
callable1.getMulti().actions.forEach((regionName, actions) -> {
actions.forEach(action -> {
if (Bytes.equals(action.getAction().getRow(), GOOD_ROW)) {
mr.add(regionName, action.getOriginalIndex(), EMPTY_RESULT);
} else if (Bytes.equals(action.getAction().getRow(), BAD_ROW)) {
mr.add(regionName, action.getOriginalIndex(), IOE);
}
});
});
mr.addException(REGION_INFO.getRegionName(), IOE);
return new RpcRetryingCallerImpl<AbstractResponse>(100, 500, 0, 9) {
@Override
public AbstractResponse callWithoutRetries(RetryingCallable<AbstractResponse> callable,
int callTimeout) {
try {
// sleep one second in order for threadpool to start another thread instead of reusing
// existing one.
Thread.sleep(1000);
} catch (InterruptedException e) {
// pass
}
return mr;
}
};
}
}
}

View File

@ -0,0 +1,173 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
/**
* The purpose of this test is to make sure the region exception won't corrupt the results
* of batch. The prescription is shown below.
* 1) honor the action result rather than region exception. If the action have both of true result
* and region exception, the action is fine as the exception is caused by other actions
* which are in the same region.
* 2) honor the action exception rather than region exception. If the action have both of action
* exception and region exception, we deal with the action exception only. If we also
* handle the region exception for the same action, it will introduce the negative count of
* actions in progress. The AsyncRequestFuture#waitUntilDone will block forever.
*
* The no-cluster test is in TestAsyncProcessWithRegionException.
*/
@Category({ MediumTests.class, ClientTests.class })
public class TestMalformedCellFromClient {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestMalformedCellFromClient.class);
private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
private static final byte[] FAMILY = Bytes.toBytes("testFamily");
private static final int CELL_SIZE = 100;
private static final TableName TABLE_NAME = TableName.valueOf("TestMalformedCellFromClient");
@BeforeClass
public static void setUpBeforeClass() throws Exception {
// disable the retry
TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 0);
TEST_UTIL.startMiniCluster(1);
}
@Before
public void before() throws Exception {
TableDescriptor desc = TableDescriptorBuilder.newBuilder(TABLE_NAME)
.addColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY))
.setValue(HRegion.HBASE_MAX_CELL_SIZE_KEY, String.valueOf(CELL_SIZE)).build();
TEST_UTIL.getConnection().getAdmin().createTable(desc);
}
@After
public void tearDown() throws Exception {
for (TableDescriptor htd : TEST_UTIL.getAdmin().listTableDescriptors()) {
TEST_UTIL.deleteTable(htd.getTableName());
}
}
@AfterClass
public static void tearDownAfterClass() throws Exception {
TEST_UTIL.shutdownMiniCluster();
}
/**
* The purpose of this ut is to check the consistency between the exception and results.
* If the RetriesExhaustedWithDetailsException contains the whole batch,
* each result should be of IOE. Otherwise, the row operation which is not in the exception
* should have a true result.
*/
@Test
public void testRegionException() throws InterruptedException, IOException {
List<Row> batches = new ArrayList<>();
batches.add(new Put(Bytes.toBytes("good")).addColumn(FAMILY, null, new byte[10]));
// the rm is used to prompt the region exception.
// see RSRpcServices#multi
RowMutations rm = new RowMutations(Bytes.toBytes("fail"));
rm.add(new Put(rm.getRow()).addColumn(FAMILY, null, new byte[CELL_SIZE]));
batches.add(rm);
Object[] results = new Object[batches.size()];
try (Table table = TEST_UTIL.getConnection().getTable(TABLE_NAME)) {
Throwable exceptionByCaught = null;
try {
table.batch(batches, results);
fail("Where is the exception? We put the malformed cells!!!");
} catch (RetriesExhaustedWithDetailsException e) {
for (Throwable throwable : e.getCauses()) {
assertNotNull(throwable);
}
assertEquals(1, e.getNumExceptions());
exceptionByCaught = e.getCause(0);
}
for (Object obj : results) {
assertNotNull(obj);
}
assertEquals(Result.class, results[0].getClass());
assertEquals(exceptionByCaught.getClass(), results[1].getClass());
Result result = table.get(new Get(Bytes.toBytes("good")));
assertEquals(1, result.size());
Cell cell = result.getColumnLatestCell(FAMILY, null);
assertTrue(Bytes.equals(CellUtil.cloneValue(cell), new byte[10]));
}
}
/**
* The purpose of this ut is to check the consistency between the exception and results.
* If the RetriesExhaustedWithDetailsException contains the whole batch,
* each result should be of IOE. Otherwise, the row operation which is not in the exception
* should have a true result.
*/
@Test
public void testRegionExceptionByAsync() throws Exception {
List<Row> batches = new ArrayList<>();
batches.add(new Put(Bytes.toBytes("good")).addColumn(FAMILY, null, new byte[10]));
// the rm is used to prompt the region exception.
// see RSRpcServices#multi
RowMutations rm = new RowMutations(Bytes.toBytes("fail"));
rm.add(new Put(rm.getRow()).addColumn(FAMILY, null, new byte[CELL_SIZE]));
batches.add(rm);
try (AsyncConnection asyncConnection = ConnectionFactory
.createAsyncConnection(TEST_UTIL.getConfiguration()).get()) {
AsyncTable<AdvancedScanResultConsumer> table = asyncConnection.getTable(TABLE_NAME);
List<CompletableFuture<AdvancedScanResultConsumer>> results = table.batch(batches);
assertEquals(2, results.size());
try {
results.get(1).get();
fail("Where is the exception? We put the malformed cells!!!");
} catch (ExecutionException e) {
// pass
}
Result result = table.get(new Get(Bytes.toBytes("good"))).get();
assertEquals(1, result.size());
Cell cell = result.getColumnLatestCell(FAMILY, null);
assertTrue(Bytes.equals(CellUtil.cloneValue(cell), new byte[10]));
}
}
}