HBASE-19876 The exception happening in converting pb mutation to hbase.mutation messes up the CellScanner

This commit is contained in:
Chia-Ping Tsai 2018-02-11 03:49:53 +08:00
parent 3623089cba
commit 6923472f75
3 changed files with 263 additions and 84 deletions

View File

@ -473,7 +473,7 @@ public final class RequestConverter {
return regionActionBuilder; return regionActionBuilder;
} }
private static RegionAction.Builder getRegionActionBuilderWithRegion( public static RegionAction.Builder getRegionActionBuilderWithRegion(
final RegionAction.Builder regionActionBuilder, final byte [] regionName) { final RegionAction.Builder regionActionBuilder, final byte [] regionName) {
RegionSpecifier region = buildRegionSpecifier(RegionSpecifierType.REGION_NAME, regionName); RegionSpecifier region = buildRegionSpecifier(RegionSpecifierType.REGION_NAME, regionName);
regionActionBuilder.setRegion(region); regionActionBuilder.setRegion(region);
@ -1099,7 +1099,7 @@ public final class RequestConverter {
* @return a Condition * @return a Condition
* @throws IOException * @throws IOException
*/ */
private static Condition buildCondition(final byte[] row, final byte[] family, public static Condition buildCondition(final byte[] row, final byte[] family,
final byte[] qualifier, final ByteArrayComparable comparator, final CompareType compareType) final byte[] qualifier, final ByteArrayComparable comparator, final CompareType compareType)
throws IOException { throws IOException {
Condition.Builder builder = Condition.newBuilder(); Condition.Builder builder = Condition.newBuilder();

View File

@ -555,23 +555,6 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
} }
} }
/**
* Mutate a list of rows atomically.
* @param cellScanner if non-null, the mutation data -- the Cell content.
*/
private void mutateRows(final HRegion region, final OperationQuota quota,
final List<ClientProtos.Action> actions, final CellScanner cellScanner,
RegionActionResult.Builder builder, final ActivePolicyEnforcement spaceQuotaEnforcement)
throws IOException {
for (ClientProtos.Action action: actions) {
if (action.hasGet()) {
throw new DoNotRetryIOException("Atomic put and/or delete only, not a Get=" +
action.getGet());
}
}
doBatchOp(builder, region, quota, actions, cellScanner, spaceQuotaEnforcement, true);
}
/** /**
* Mutate a list of rows atomically. * Mutate a list of rows atomically.
* @param cellScanner if non-null, the mutation data -- the Cell content. * @param cellScanner if non-null, the mutation data -- the Cell content.
@ -580,46 +563,56 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
final CellScanner cellScanner, byte[] row, byte[] family, byte[] qualifier, final CellScanner cellScanner, byte[] row, byte[] family, byte[] qualifier,
CompareOperator op, ByteArrayComparable comparator, RegionActionResult.Builder builder, CompareOperator op, ByteArrayComparable comparator, RegionActionResult.Builder builder,
ActivePolicyEnforcement spaceQuotaEnforcement) throws IOException { ActivePolicyEnforcement spaceQuotaEnforcement) throws IOException {
if (!region.getRegionInfo().isMetaRegion()) { int countOfCompleteMutation = 0;
regionServer.cacheFlusher.reclaimMemStoreMemory(); try {
} if (!region.getRegionInfo().isMetaRegion()) {
RowMutations rm = null; regionServer.cacheFlusher.reclaimMemStoreMemory();
int i = 0; }
ClientProtos.ResultOrException.Builder resultOrExceptionOrBuilder = RowMutations rm = null;
int i = 0;
ClientProtos.ResultOrException.Builder resultOrExceptionOrBuilder =
ClientProtos.ResultOrException.newBuilder(); ClientProtos.ResultOrException.newBuilder();
for (ClientProtos.Action action: actions) { for (ClientProtos.Action action: actions) {
if (action.hasGet()) { if (action.hasGet()) {
throw new DoNotRetryIOException("Atomic put and/or delete only, not a Get=" + throw new DoNotRetryIOException("Atomic put and/or delete only, not a Get=" +
action.getGet()); action.getGet());
} }
MutationType type = action.getMutation().getMutateType(); MutationType type = action.getMutation().getMutateType();
if (rm == null) { if (rm == null) {
rm = new RowMutations(action.getMutation().getRow().toByteArray(), actions.size()); rm = new RowMutations(action.getMutation().getRow().toByteArray(), actions.size());
} }
switch (type) { switch (type) {
case PUT: case PUT:
Put put = ProtobufUtil.toPut(action.getMutation(), cellScanner); Put put = ProtobufUtil.toPut(action.getMutation(), cellScanner);
checkCellSizeLimit(region, put); ++countOfCompleteMutation;
spaceQuotaEnforcement.getPolicyEnforcement(region).check(put); checkCellSizeLimit(region, put);
rm.add(put); spaceQuotaEnforcement.getPolicyEnforcement(region).check(put);
break; rm.add(put);
case DELETE: break;
Delete del = ProtobufUtil.toDelete(action.getMutation(), cellScanner); case DELETE:
spaceQuotaEnforcement.getPolicyEnforcement(region).check(del); Delete del = ProtobufUtil.toDelete(action.getMutation(), cellScanner);
rm.add(del); ++countOfCompleteMutation;
break; spaceQuotaEnforcement.getPolicyEnforcement(region).check(del);
default: rm.add(del);
throw new DoNotRetryIOException("Atomic put and/or delete only, not " + type.name()); break;
} default:
// To unify the response format with doNonAtomicRegionMutation and read through client's throw new DoNotRetryIOException("Atomic put and/or delete only, not " + type.name());
// AsyncProcess we have to add an empty result instance per operation }
resultOrExceptionOrBuilder.clear(); // To unify the response format with doNonAtomicRegionMutation and read through client's
resultOrExceptionOrBuilder.setIndex(i++); // AsyncProcess we have to add an empty result instance per operation
builder.addResultOrException( resultOrExceptionOrBuilder.clear();
resultOrExceptionOrBuilder.setIndex(i++);
builder.addResultOrException(
resultOrExceptionOrBuilder.build()); resultOrExceptionOrBuilder.build());
}
return region.checkAndRowMutate(row, family, qualifier, op, comparator, rm);
} finally {
// Currently, the checkAndMutate isn't supported by batch so it won't mess up the cell scanner
// even if the malformed cells are not skipped.
for (int i = countOfCompleteMutation; i < actions.size(); ++i) {
skipCellsForMutation(actions.get(i), cellScanner);
}
} }
return region.checkAndRowMutate(row, family, qualifier, op,
comparator, rm);
} }
/** /**
@ -786,9 +779,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
context.incrementResponseExceptionSize(pair.getSerializedSize()); context.incrementResponseExceptionSize(pair.getSerializedSize());
resultOrExceptionBuilder.setIndex(action.getIndex()); resultOrExceptionBuilder.setIndex(action.getIndex());
builder.addResultOrException(resultOrExceptionBuilder.build()); builder.addResultOrException(resultOrExceptionBuilder.build());
if (cellScanner != null) { skipCellsForMutation(action, cellScanner);
skipCellsForMutation(action, cellScanner);
}
continue; continue;
} }
if (action.hasGet()) { if (action.hasGet()) {
@ -895,6 +886,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
try { try {
doBatchOp(builder, region, quota, mutations, cellScanner, spaceQuotaEnforcement, false); doBatchOp(builder, region, quota, mutations, cellScanner, spaceQuotaEnforcement, false);
} catch (IOException ioe) { } catch (IOException ioe) {
// TODO do the refactor to avoid this catch as it is useless
// doBatchOp has handled the IOE for all non-atomic operations.
rpcServer.getMetrics().exception(ioe); rpcServer.getMetrics().exception(ioe);
NameBytesPair pair = ResponseConverter.buildException(ioe); NameBytesPair pair = ResponseConverter.buildException(ioe);
resultOrExceptionBuilder.setException(pair); resultOrExceptionBuilder.setException(pair);
@ -946,6 +939,10 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
Map<Mutation, ClientProtos.Action> mutationActionMap = new HashMap<>(); Map<Mutation, ClientProtos.Action> mutationActionMap = new HashMap<>();
int i = 0; int i = 0;
for (ClientProtos.Action action: mutations) { for (ClientProtos.Action action: mutations) {
if (action.hasGet()) {
throw new DoNotRetryIOException("Atomic put and/or delete only, not a Get=" +
action.getGet());
}
MutationProto m = action.getMutation(); MutationProto m = action.getMutation();
Mutation mutation; Mutation mutation;
if (m.getMutateType() == MutationType.PUT) { if (m.getMutateType() == MutationType.PUT) {
@ -968,8 +965,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
} }
// HBASE-17924 // HBASE-17924
// Sort to improve lock efficiency for non-atomic batch of operations. If atomic (mostly // Sort to improve lock efficiency for non-atomic batch of operations. If atomic
// called from mutateRows()), order is preserved as its expected from the client // order is preserved as its expected from the client
if (!atomic) { if (!atomic) {
Arrays.sort(mArray, (v1, v2) -> Row.COMPARATOR.compare(v1, v2)); Arrays.sort(mArray, (v1, v2) -> Row.COMPARATOR.compare(v1, v2));
} }
@ -1004,12 +1001,19 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
} }
} }
} catch (IOException ie) { } catch (IOException ie) {
int processedMutationIndex = 0;
for (Action mutation : mutations) {
// The non-null mArray[i] means the cell scanner has been read.
if (mArray[processedMutationIndex++] == null) {
skipCellsForMutation(mutation, cells);
}
if (!atomic) {
builder.addResultOrException(getResultOrException(ie, mutation.getIndex()));
}
}
if (atomic) { if (atomic) {
throw ie; throw ie;
} }
for (Action mutation : mutations) {
builder.addResultOrException(getResultOrException(ie, mutation.getIndex()));
}
} }
if (regionServer.metricsRegionServer != null) { if (regionServer.metricsRegionServer != null) {
long after = EnvironmentEdgeManager.currentTime(); long after = EnvironmentEdgeManager.currentTime();
@ -2549,9 +2553,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
// All Mutations in this RegionAction not executed as we can not see the Region online here // All Mutations in this RegionAction not executed as we can not see the Region online here
// in this RS. Will be retried from Client. Skipping all the Cells in CellScanner // in this RS. Will be retried from Client. Skipping all the Cells in CellScanner
// corresponding to these Mutations. // corresponding to these Mutations.
if (cellScanner != null) { skipCellsForMutations(regionAction.getActionList(), cellScanner);
skipCellsForMutations(regionAction.getActionList(), cellScanner);
}
continue; // For this region it's a failure. continue; // For this region it's a failure.
} }
@ -2572,8 +2574,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
cellScanner, row, family, qualifier, op, cellScanner, row, family, qualifier, op,
comparator, regionActionResultBuilder, spaceQuotaEnforcement); comparator, regionActionResultBuilder, spaceQuotaEnforcement);
} else { } else {
mutateRows(region, quota, regionAction.getActionList(), cellScanner, doBatchOp(regionActionResultBuilder, region, quota, regionAction.getActionList(),
regionActionResultBuilder, spaceQuotaEnforcement); cellScanner, spaceQuotaEnforcement, true);
processed = Boolean.TRUE; processed = Boolean.TRUE;
} }
} catch (IOException e) { } catch (IOException e) {
@ -2620,12 +2622,18 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
} }
private void skipCellsForMutations(List<Action> actions, CellScanner cellScanner) { private void skipCellsForMutations(List<Action> actions, CellScanner cellScanner) {
if (cellScanner == null) {
return;
}
for (Action action : actions) { for (Action action : actions) {
skipCellsForMutation(action, cellScanner); skipCellsForMutation(action, cellScanner);
} }
} }
private void skipCellsForMutation(Action action, CellScanner cellScanner) { private void skipCellsForMutation(Action action, CellScanner cellScanner) {
if (cellScanner == null) {
return;
}
try { try {
if (action.hasMutation()) { if (action.hasMutation()) {
MutationProto m = action.getMutation(); MutationProto m = action.getMutation();

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.hbase.client; package org.apache.hadoop.hbase.client;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail; import static org.junit.Assert.fail;
@ -29,11 +30,15 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.filter.BinaryComparator;
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.testclassification.ClientTests; import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
@ -44,23 +49,21 @@ import org.junit.BeforeClass;
import org.junit.ClassRule; import org.junit.ClassRule;
import org.junit.Test; import org.junit.Test;
import org.junit.experimental.categories.Category; import org.junit.experimental.categories.Category;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
/** /**
* The purpose of this test is to make sure the region exception won't corrupt the results * The purpose of this test is to ensure whether rs deals with the malformed cells correctly.
* of batch. The prescription is shown below.
* 1) honor the action result rather than region exception. If the action have both of true result
* and region exception, the action is fine as the exception is caused by other actions
* which are in the same region.
* 2) honor the action exception rather than region exception. If the action have both of action
* exception and region exception, we deal with the action exception only. If we also
* handle the region exception for the same action, it will introduce the negative count of
* actions in progress. The AsyncRequestFuture#waitUntilDone will block forever.
*
* The no-cluster test is in TestAsyncProcessWithRegionException.
*/ */
@Category({ MediumTests.class, ClientTests.class }) @Category({ MediumTests.class, ClientTests.class })
public class TestMalformedCellFromClient { public class TestMalformedCellFromClient {
private static final Logger LOG = LoggerFactory.getLogger(TestMalformedCellFromClient.class);
@ClassRule @ClassRule
public static final HBaseClassTestRule CLASS_RULE = public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestMalformedCellFromClient.class); HBaseClassTestRule.forClass(TestMalformedCellFromClient.class);
@ -139,10 +142,16 @@ public class TestMalformedCellFromClient {
} }
/** /**
* The purpose of this ut is to check the consistency between the exception and results. * This test verifies region exception doesn't corrupt the results of batch. The prescription is
* If the RetriesExhaustedWithDetailsException contains the whole batch, * shown below. 1) honor the action result rather than region exception. If the action have both
* each result should be of IOE. Otherwise, the row operation which is not in the exception * of true result and region exception, the action is fine as the exception is caused by other
* should have a true result. * actions which are in the same region. 2) honor the action exception rather than region
* exception. If the action have both of action exception and region exception, we deal with the
* action exception only. If we also handle the region exception for the same action, it will
* introduce the negative count of actions in progress. The AsyncRequestFuture#waitUntilDone will
* block forever. If the RetriesExhaustedWithDetailsException contains the whole batch, each
* result should be of IOE. Otherwise, the row operation which is not in the exception should have
* a true result. The no-cluster test is in TestAsyncProcessWithRegionException.
*/ */
@Test @Test
public void testRegionExceptionByAsync() throws Exception { public void testRegionExceptionByAsync() throws Exception {
@ -170,4 +179,166 @@ public class TestMalformedCellFromClient {
assertTrue(Bytes.equals(CellUtil.cloneValue(cell), new byte[10])); assertTrue(Bytes.equals(CellUtil.cloneValue(cell), new byte[10]));
} }
} }
/**
* The invalid cells is in rm. The rm should fail but the subsequent mutations should succeed.
* Currently, we have no client api to submit the request consisting of condition-rm and mutation.
* Hence, this test build the request manually.
*/
@Test
public void testAtomicOperations() throws Exception {
RowMutations rm = new RowMutations(Bytes.toBytes("fail"));
rm.add(new Put(rm.getRow()).addColumn(FAMILY, null, new byte[CELL_SIZE]));
rm.add(new Put(rm.getRow()).addColumn(FAMILY, null, new byte[10]));
Put put = new Put(Bytes.toBytes("good")).addColumn(FAMILY, null, new byte[10]);
// build the request
HRegion r = TEST_UTIL.getMiniHBaseCluster().getRegions(TABLE_NAME).get(0);
ClientProtos.MultiRequest request =
ClientProtos.MultiRequest.newBuilder(createRequest(rm, r.getRegionInfo().getRegionName()))
.addRegionAction(ClientProtos.RegionAction.newBuilder().setRegion(RequestConverter
.buildRegionSpecifier(HBaseProtos.RegionSpecifier.RegionSpecifierType.REGION_NAME,
r.getRegionInfo().getRegionName())).addAction(ClientProtos.Action.newBuilder()
.setMutation(
ProtobufUtil.toMutationNoData(ClientProtos.MutationProto.MutationType.PUT, put))))
.build();
List<Cell> cells = new ArrayList<>();
for (Mutation m : rm.getMutations()) {
cells.addAll(m.getCellList(FAMILY));
}
cells.addAll(put.getCellList(FAMILY));
assertEquals(3, cells.size());
HBaseRpcController controller = Mockito.mock(HBaseRpcController.class);
Mockito.when(controller.cellScanner()).thenReturn(CellUtil.createCellScanner(cells));
HRegionServer rs = TEST_UTIL.getMiniHBaseCluster().getRegionServer(
TEST_UTIL.getMiniHBaseCluster()
.getServerHoldingRegion(TABLE_NAME, r.getRegionInfo().getRegionName()));
ClientProtos.MultiResponse response = rs.getRSRpcServices().multi(controller, request);
assertEquals(2, response.getRegionActionResultCount());
assertTrue(response.getRegionActionResultList().get(0).hasException());
assertFalse(response.getRegionActionResultList().get(1).hasException());
assertEquals(1, response.getRegionActionResultList().get(1).getResultOrExceptionCount());
assertTrue(
response.getRegionActionResultList().get(1).getResultOrExceptionList().get(0).hasResult());
try (Table table = TEST_UTIL.getConnection().getTable(TABLE_NAME)) {
Result result = table.get(new Get(Bytes.toBytes("good")));
assertEquals(1, result.size());
Cell cell = result.getColumnLatestCell(FAMILY, null);
assertTrue(Bytes.equals(CellUtil.cloneValue(cell), new byte[10]));
}
}
private static ClientProtos.MultiRequest createRequest(RowMutations rm, byte[] regionName)
throws IOException {
ClientProtos.RegionAction.Builder builder = RequestConverter
.getRegionActionBuilderWithRegion(ClientProtos.RegionAction.newBuilder(), regionName);
builder.setAtomic(true);
ClientProtos.Action.Builder actionBuilder = ClientProtos.Action.newBuilder();
ClientProtos.MutationProto.Builder mutationBuilder = ClientProtos.MutationProto.newBuilder();
ClientProtos.Condition condition = RequestConverter
.buildCondition(rm.getRow(), FAMILY, null, new BinaryComparator(new byte[10]),
HBaseProtos.CompareType.EQUAL);
for (Mutation mutation : rm.getMutations()) {
ClientProtos.MutationProto.MutationType mutateType = null;
if (mutation instanceof Put) {
mutateType = ClientProtos.MutationProto.MutationType.PUT;
} else if (mutation instanceof Delete) {
mutateType = ClientProtos.MutationProto.MutationType.DELETE;
} else {
throw new DoNotRetryIOException(
"RowMutations supports only put and delete, not " + mutation.getClass().getName());
}
mutationBuilder.clear();
ClientProtos.MutationProto mp =
ProtobufUtil.toMutationNoData(mutateType, mutation, mutationBuilder);
actionBuilder.clear();
actionBuilder.setMutation(mp);
builder.addAction(actionBuilder.build());
}
ClientProtos.MultiRequest request =
ClientProtos.MultiRequest.newBuilder().addRegionAction(builder.build())
.setCondition(condition).build();
return request;
}
/**
* This test depends on how regionserver process the batch ops.
* 1) group the put/delete until meeting the increment
* 2) process the batch of put/delete
* 3) process the increment
* see RSRpcServices#doNonAtomicRegionMutation
*/
@Test
public void testNonAtomicOperations() throws InterruptedException, IOException {
Increment inc = new Increment(Bytes.toBytes("good")).addColumn(FAMILY, null, 100);
List<Row> batches = new ArrayList<>();
// the first and second puts will be group by regionserver
batches.add(new Put(Bytes.toBytes("fail")).addColumn(FAMILY, null, new byte[CELL_SIZE]));
batches.add(new Put(Bytes.toBytes("fail")).addColumn(FAMILY, null, new byte[CELL_SIZE]));
// this Increment should succeed
batches.add(inc);
// this put should succeed
batches.add(new Put(Bytes.toBytes("good")).addColumn(FAMILY, null, new byte[1]));
Object[] objs = new Object[batches.size()];
try (Table table = TEST_UTIL.getConnection().getTable(TABLE_NAME)) {
table.batch(batches, objs);
fail("Where is the exception? We put the malformed cells!!!");
} catch (RetriesExhaustedWithDetailsException e) {
assertEquals(2, e.getNumExceptions());
for (int i = 0; i != e.getNumExceptions(); ++i) {
assertNotNull(e.getCause(i));
assertEquals(DoNotRetryIOException.class, e.getCause(i).getClass());
assertEquals("fail", Bytes.toString(e.getRow(i).getRow()));
}
} finally {
assertObjects(objs, batches.size());
assertTrue(objs[0] instanceof IOException);
assertTrue(objs[1] instanceof IOException);
assertEquals(Result.class, objs[2].getClass());
assertEquals(Result.class, objs[3].getClass());
}
}
@Test
public void testRowMutations() throws InterruptedException, IOException {
Put put = new Put(Bytes.toBytes("good")).addColumn(FAMILY, null, new byte[1]);
List<Row> batches = new ArrayList<>();
RowMutations mutations = new RowMutations(Bytes.toBytes("fail"));
// the first and second puts will be group by regionserver
mutations.add(new Put(Bytes.toBytes("fail")).addColumn(FAMILY, null, new byte[CELL_SIZE]));
mutations.add(new Put(Bytes.toBytes("fail")).addColumn(FAMILY, null, new byte[CELL_SIZE]));
batches.add(mutations);
// this bm should succeed
mutations = new RowMutations(Bytes.toBytes("good"));
mutations.add(put);
mutations.add(put);
batches.add(mutations);
Object[] objs = new Object[batches.size()];
try (Table table = TEST_UTIL.getConnection().getTable(TABLE_NAME)) {
table.batch(batches, objs);
fail("Where is the exception? We put the malformed cells!!!");
} catch (RetriesExhaustedWithDetailsException e) {
assertEquals(1, e.getNumExceptions());
for (int i = 0; i != e.getNumExceptions(); ++i) {
assertNotNull(e.getCause(i));
assertTrue(e.getCause(i) instanceof IOException);
assertEquals("fail", Bytes.toString(e.getRow(i).getRow()));
}
} finally {
assertObjects(objs, batches.size());
assertTrue(objs[0] instanceof IOException);
assertEquals(Result.class, objs[1].getClass());
}
}
private static void assertObjects(Object[] objs, int expectedSize) {
int count = 0;
for (Object obj : objs) {
assertNotNull(obj);
++count;
}
assertEquals(expectedSize, count);
}
} }