HBASE-19876 The exception happening in converting pb mutation to hbase.mutation messes up the CellScanner
This commit is contained in:
parent
0f79c497c5
commit
f77b42b2af
|
@ -473,7 +473,7 @@ public final class RequestConverter {
|
|||
return regionActionBuilder;
|
||||
}
|
||||
|
||||
private static RegionAction.Builder getRegionActionBuilderWithRegion(
|
||||
public static RegionAction.Builder getRegionActionBuilderWithRegion(
|
||||
final RegionAction.Builder regionActionBuilder, final byte [] regionName) {
|
||||
RegionSpecifier region = buildRegionSpecifier(RegionSpecifierType.REGION_NAME, regionName);
|
||||
regionActionBuilder.setRegion(region);
|
||||
|
@ -997,7 +997,7 @@ public final class RequestConverter {
|
|||
}
|
||||
|
||||
/**
|
||||
* @see {@link #buildRollWALWriterRequest()
|
||||
* @see {@link #buildRollWALWriterRequest()}
|
||||
*/
|
||||
private static RollWALWriterRequest ROLL_WAL_WRITER_REQUEST =
|
||||
RollWALWriterRequest.newBuilder().build();
|
||||
|
@ -1066,7 +1066,7 @@ public final class RequestConverter {
|
|||
* @return a Condition
|
||||
* @throws IOException
|
||||
*/
|
||||
private static Condition buildCondition(final byte[] row,
|
||||
public static Condition buildCondition(final byte[] row,
|
||||
final byte[] family, final byte [] qualifier,
|
||||
final ByteArrayComparable comparator,
|
||||
final CompareType compareType) throws IOException {
|
||||
|
|
|
@ -458,42 +458,54 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
|
|||
private void mutateRows(final Region region,
|
||||
final List<ClientProtos.Action> actions,
|
||||
final CellScanner cellScanner, RegionActionResult.Builder builder) throws IOException {
|
||||
if (!region.getRegionInfo().isMetaTable()) {
|
||||
regionServer.cacheFlusher.reclaimMemStoreMemory();
|
||||
}
|
||||
RowMutations rm = null;
|
||||
int i = 0;
|
||||
ClientProtos.ResultOrException.Builder resultOrExceptionOrBuilder =
|
||||
int countOfCompleteMutation = 0;
|
||||
try {
|
||||
if (!region.getRegionInfo().isMetaTable()) {
|
||||
regionServer.cacheFlusher.reclaimMemStoreMemory();
|
||||
}
|
||||
RowMutations rm = null;
|
||||
int i = 0;
|
||||
ClientProtos.ResultOrException.Builder resultOrExceptionOrBuilder =
|
||||
ClientProtos.ResultOrException.newBuilder();
|
||||
for (ClientProtos.Action action: actions) {
|
||||
if (action.hasGet()) {
|
||||
throw new DoNotRetryIOException("Atomic put and/or delete only, not a Get=" +
|
||||
action.getGet());
|
||||
}
|
||||
MutationType type = action.getMutation().getMutateType();
|
||||
if (rm == null) {
|
||||
rm = new RowMutations(action.getMutation().getRow().toByteArray());
|
||||
}
|
||||
switch (type) {
|
||||
case PUT:
|
||||
Put put = ProtobufUtil.toPut(action.getMutation(), cellScanner);
|
||||
checkCellSizeLimit(region, put);
|
||||
rm.add(put);
|
||||
break;
|
||||
case DELETE:
|
||||
rm.add(ProtobufUtil.toDelete(action.getMutation(), cellScanner));
|
||||
break;
|
||||
default:
|
||||
throw new DoNotRetryIOException("Atomic put and/or delete only, not " + type.name());
|
||||
}
|
||||
// To unify the response format with doNonAtomicRegionMutation and read through client's
|
||||
// AsyncProcess we have to add an empty result instance per operation
|
||||
resultOrExceptionOrBuilder.clear();
|
||||
resultOrExceptionOrBuilder.setIndex(i++);
|
||||
builder.addResultOrException(
|
||||
for (ClientProtos.Action action: actions) {
|
||||
if (action.hasGet()) {
|
||||
throw new DoNotRetryIOException("Atomic put and/or delete only, not a Get=" +
|
||||
action.getGet());
|
||||
}
|
||||
MutationType type = action.getMutation().getMutateType();
|
||||
if (rm == null) {
|
||||
rm = new RowMutations(action.getMutation().getRow().toByteArray());
|
||||
}
|
||||
switch (type) {
|
||||
case PUT:
|
||||
Put put = ProtobufUtil.toPut(action.getMutation(), cellScanner);
|
||||
++countOfCompleteMutation;
|
||||
checkCellSizeLimit(region, put);
|
||||
rm.add(put);
|
||||
break;
|
||||
case DELETE:
|
||||
Delete delete = ProtobufUtil.toDelete(action.getMutation(), cellScanner);
|
||||
++countOfCompleteMutation;
|
||||
rm.add(delete);
|
||||
break;
|
||||
default:
|
||||
throw new DoNotRetryIOException("Atomic put and/or delete only, not " + type.name());
|
||||
}
|
||||
// To unify the response format with doNonAtomicRegionMutation and read through client's
|
||||
// AsyncProcess we have to add an empty result instance per operation
|
||||
resultOrExceptionOrBuilder.clear();
|
||||
resultOrExceptionOrBuilder.setIndex(i++);
|
||||
builder.addResultOrException(
|
||||
resultOrExceptionOrBuilder.build());
|
||||
}
|
||||
region.mutateRow(rm);
|
||||
} finally {
|
||||
// Currently, the checkAndMutate isn't supported by batch so it won't mess up the cell scanner
|
||||
// even if the malformed cells are not skipped.
|
||||
for (int i = countOfCompleteMutation; i < actions.size(); ++i) {
|
||||
skipCellsForMutation(actions.get(i), cellScanner);
|
||||
}
|
||||
}
|
||||
region.mutateRow(rm);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -512,43 +524,55 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
|
|||
final CellScanner cellScanner, byte[] row, byte[] family, byte[] qualifier,
|
||||
CompareOp compareOp, ByteArrayComparable comparator,
|
||||
RegionActionResult.Builder builder) throws IOException {
|
||||
if (!region.getRegionInfo().isMetaTable()) {
|
||||
regionServer.cacheFlusher.reclaimMemStoreMemory();
|
||||
}
|
||||
RowMutations rm = null;
|
||||
int i = 0;
|
||||
ClientProtos.ResultOrException.Builder resultOrExceptionOrBuilder =
|
||||
int countOfCompleteMutation = 0;
|
||||
try {
|
||||
if (!region.getRegionInfo().isMetaTable()) {
|
||||
regionServer.cacheFlusher.reclaimMemStoreMemory();
|
||||
}
|
||||
RowMutations rm = null;
|
||||
int i = 0;
|
||||
ClientProtos.ResultOrException.Builder resultOrExceptionOrBuilder =
|
||||
ClientProtos.ResultOrException.newBuilder();
|
||||
for (ClientProtos.Action action: actions) {
|
||||
if (action.hasGet()) {
|
||||
throw new DoNotRetryIOException("Atomic put and/or delete only, not a Get=" +
|
||||
for (ClientProtos.Action action: actions) {
|
||||
if (action.hasGet()) {
|
||||
throw new DoNotRetryIOException("Atomic put and/or delete only, not a Get=" +
|
||||
action.getGet());
|
||||
}
|
||||
MutationType type = action.getMutation().getMutateType();
|
||||
if (rm == null) {
|
||||
rm = new RowMutations(action.getMutation().getRow().toByteArray());
|
||||
}
|
||||
switch (type) {
|
||||
case PUT:
|
||||
Put put = ProtobufUtil.toPut(action.getMutation(), cellScanner);
|
||||
checkCellSizeLimit(region, put);
|
||||
rm.add(put);
|
||||
break;
|
||||
case DELETE:
|
||||
rm.add(ProtobufUtil.toDelete(action.getMutation(), cellScanner));
|
||||
break;
|
||||
default:
|
||||
throw new DoNotRetryIOException("Atomic put and/or delete only, not " + type.name());
|
||||
}
|
||||
// To unify the response format with doNonAtomicRegionMutation and read through client's
|
||||
// AsyncProcess we have to add an empty result instance per operation
|
||||
resultOrExceptionOrBuilder.clear();
|
||||
resultOrExceptionOrBuilder.setIndex(i++);
|
||||
builder.addResultOrException(
|
||||
}
|
||||
MutationType type = action.getMutation().getMutateType();
|
||||
if (rm == null) {
|
||||
rm = new RowMutations(action.getMutation().getRow().toByteArray());
|
||||
}
|
||||
switch (type) {
|
||||
case PUT:
|
||||
Put put = ProtobufUtil.toPut(action.getMutation(), cellScanner);
|
||||
++countOfCompleteMutation;
|
||||
checkCellSizeLimit(region, put);
|
||||
rm.add(put);
|
||||
break;
|
||||
case DELETE:
|
||||
Delete delete = ProtobufUtil.toDelete(action.getMutation(), cellScanner);
|
||||
++countOfCompleteMutation;
|
||||
rm.add(delete);
|
||||
break;
|
||||
default:
|
||||
throw new DoNotRetryIOException("Atomic put and/or delete only, not " + type.name());
|
||||
}
|
||||
// To unify the response format with doNonAtomicRegionMutation and read through client's
|
||||
// AsyncProcess we have to add an empty result instance per operation
|
||||
resultOrExceptionOrBuilder.clear();
|
||||
resultOrExceptionOrBuilder.setIndex(i++);
|
||||
builder.addResultOrException(
|
||||
resultOrExceptionOrBuilder.build());
|
||||
}
|
||||
return region.checkAndRowMutate(row, family, qualifier, compareOp,
|
||||
}
|
||||
return region.checkAndRowMutate(row, family, qualifier, compareOp,
|
||||
comparator, rm, Boolean.TRUE);
|
||||
} finally {
|
||||
// Currently, the checkAndMutate isn't supported by batch so it won't mess up the cell scanner
|
||||
// even if the malformed cells are not skipped.
|
||||
for (int i = countOfCompleteMutation; i < actions.size(); ++i) {
|
||||
skipCellsForMutation(actions.get(i), cellScanner);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -719,9 +743,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
|
|||
context.incrementResponseExceptionSize(pair.getSerializedSize());
|
||||
resultOrExceptionBuilder.setIndex(action.getIndex());
|
||||
builder.addResultOrException(resultOrExceptionBuilder.build());
|
||||
if (cellScanner != null) {
|
||||
skipCellsForMutation(action, cellScanner);
|
||||
}
|
||||
skipCellsForMutation(action, cellScanner);
|
||||
continue;
|
||||
}
|
||||
if (action.hasGet()) {
|
||||
|
@ -919,8 +941,13 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
|
|||
}
|
||||
}
|
||||
} catch (IOException ie) {
|
||||
for (int i = 0; i < mutations.size(); i++) {
|
||||
builder.addResultOrException(getResultOrException(ie, mutations.get(i).getIndex()));
|
||||
int processedMutationIndex = 0;
|
||||
for (Action mutation : mutations) {
|
||||
// The non-null mArray[i] means the cell scanner has been read.
|
||||
if (mArray[processedMutationIndex++] == null) {
|
||||
skipCellsForMutation(mutation, cells);
|
||||
}
|
||||
builder.addResultOrException(getResultOrException(ie, mutation.getIndex()));
|
||||
}
|
||||
}
|
||||
if (regionServer.metricsRegionServer != null) {
|
||||
|
@ -2338,9 +2365,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
|
|||
// All Mutations in this RegionAction not executed as we can not see the Region online here
|
||||
// in this RS. Will be retried from Client. Skipping all the Cells in CellScanner
|
||||
// corresponding to these Mutations.
|
||||
if (cellScanner != null) {
|
||||
skipCellsForMutations(regionAction.getActionList(), cellScanner);
|
||||
}
|
||||
skipCellsForMutations(regionAction.getActionList(), cellScanner);
|
||||
continue; // For this region it's a failure.
|
||||
}
|
||||
|
||||
|
@ -2400,12 +2425,18 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
|
|||
}
|
||||
|
||||
private void skipCellsForMutations(List<Action> actions, CellScanner cellScanner) {
|
||||
if (cellScanner == null) {
|
||||
return;
|
||||
}
|
||||
for (Action action : actions) {
|
||||
skipCellsForMutation(action, cellScanner);
|
||||
}
|
||||
}
|
||||
|
||||
private void skipCellsForMutation(Action action, CellScanner cellScanner) {
|
||||
if (cellScanner == null) {
|
||||
return;
|
||||
}
|
||||
try {
|
||||
if (action.hasMutation()) {
|
||||
MutationProto m = action.getMutation();
|
||||
|
|
|
@ -18,6 +18,7 @@
|
|||
package org.apache.hadoop.hbase.client;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.fail;
|
||||
|
@ -25,14 +26,24 @@ import static org.junit.Assert.fail;
|
|||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.hbase.Cell;
|
||||
import org.apache.hadoop.hbase.CellUtil;
|
||||
import org.apache.hadoop.hbase.DoNotRetryIOException;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.HColumnDescriptor;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.filter.BinaryComparator;
|
||||
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
|
||||
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
|
||||
import org.apache.hadoop.hbase.protobuf.RequestConverter;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegion;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegionServer;
|
||||
import org.apache.hadoop.hbase.testclassification.ClientTests;
|
||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
|
@ -42,23 +53,14 @@ import org.junit.Before;
|
|||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
import org.mockito.Mockito;
|
||||
|
||||
/**
|
||||
* The purpose of this test is to make sure the region exception won't corrupt the results
|
||||
* of batch. The prescription is shown below.
|
||||
* 1) honor the action result rather than region exception. If the action have both of true result
|
||||
* and region exception, the action is fine as the exception is caused by other actions
|
||||
* which are in the same region.
|
||||
* 2) honor the action exception rather than region exception. If the action have both of action
|
||||
* exception and region exception, we deal with the action exception only. If we also
|
||||
* handle the region exception for the same action, it will introduce the negative count of
|
||||
* actions in progress. The AsyncRequestFuture#waitUntilDone will block forever.
|
||||
*
|
||||
* The no-cluster test is in TestAsyncProcessWithRegionException.
|
||||
* The purpose of this test is to ensure whether rs deals with the malformed cells correctly.
|
||||
*/
|
||||
@Category({ MediumTests.class, ClientTests.class })
|
||||
public class TestMalformedCellFromClient {
|
||||
|
||||
private static final Log LOG = LogFactory.getLog(TestMalformedCellFromClient.class);
|
||||
private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
|
||||
private static final byte[] FAMILY = Bytes.toBytes("testFamily");
|
||||
private static final int CELL_SIZE = 100;
|
||||
|
@ -73,9 +75,9 @@ public class TestMalformedCellFromClient {
|
|||
|
||||
@Before
|
||||
public void before() throws Exception {
|
||||
HTableDescriptor desc = new HTableDescriptor(TABLE_NAME)
|
||||
.addFamily(new HColumnDescriptor(FAMILY))
|
||||
.setValue(HRegion.HBASE_MAX_CELL_SIZE_KEY, String.valueOf(CELL_SIZE));
|
||||
HTableDescriptor desc =
|
||||
new HTableDescriptor(TABLE_NAME).addFamily(new HColumnDescriptor(FAMILY))
|
||||
.setValue(HRegion.HBASE_MAX_CELL_SIZE_KEY, String.valueOf(CELL_SIZE));
|
||||
TEST_UTIL.getConnection().getAdmin().createTable(desc);
|
||||
}
|
||||
|
||||
|
@ -92,12 +94,18 @@ public class TestMalformedCellFromClient {
|
|||
}
|
||||
|
||||
/**
|
||||
* The purpose of this ut is to check the consistency between the exception and results.
|
||||
* If the RetriesExhaustedWithDetailsException contains the whole batch,
|
||||
* each result should be of IOE. Otherwise, the row operation which is not in the exception
|
||||
* should have a true result.
|
||||
* This test verifies region exception doesn't corrupt the results of batch. The prescription is
|
||||
* shown below. 1) honor the action result rather than region exception. If the action have both
|
||||
* of true result and region exception, the action is fine as the exception is caused by other
|
||||
* actions which are in the same region. 2) honor the action exception rather than region
|
||||
* exception. If the action have both of action exception and region exception, we deal with the
|
||||
* action exception only. If we also handle the region exception for the same action, it will
|
||||
* introduce the negative count of actions in progress. The AsyncRequestFuture#waitUntilDone will
|
||||
* block forever. If the RetriesExhaustedWithDetailsException contains the whole batch, each
|
||||
* result should be of IOE. Otherwise, the row operation which is not in the exception should have
|
||||
* a true result. The no-cluster test is in TestAsyncProcessWithRegionException.
|
||||
*/
|
||||
@Test(timeout=60000)
|
||||
@Test(timeout = 60000)
|
||||
public void testRegionException() throws InterruptedException, IOException {
|
||||
List<Row> batches = new ArrayList<>();
|
||||
batches.add(new Put(Bytes.toBytes("good")).addColumn(FAMILY, null, new byte[10]));
|
||||
|
@ -131,4 +139,163 @@ public class TestMalformedCellFromClient {
|
|||
assertTrue(Bytes.equals(CellUtil.cloneValue(cell), new byte[10]));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* The invalid cells is in rm. The rm should fail but the subsequent mutations should succeed.
|
||||
* Currently, we have no client api to submit the request consisting of condition-rm and mutation.
|
||||
* Hence, this test build the request manually.
|
||||
*/
|
||||
@Test
|
||||
public void testAtomicOperations() throws Exception {
|
||||
RowMutations rm = new RowMutations(Bytes.toBytes("fail"));
|
||||
rm.add(new Put(rm.getRow()).addColumn(FAMILY, null, new byte[CELL_SIZE]));
|
||||
rm.add(new Put(rm.getRow()).addColumn(FAMILY, null, new byte[10]));
|
||||
Put put = new Put(Bytes.toBytes("good")).addColumn(FAMILY, null, new byte[10]);
|
||||
|
||||
// build the request
|
||||
HRegion r = TEST_UTIL.getMiniHBaseCluster().getRegions(TABLE_NAME).get(0);
|
||||
ClientProtos.MultiRequest request =
|
||||
ClientProtos.MultiRequest.newBuilder(createRequest(rm, r.getRegionInfo().getRegionName()))
|
||||
.addRegionAction(ClientProtos.RegionAction.newBuilder().setRegion(RequestConverter
|
||||
.buildRegionSpecifier(HBaseProtos.RegionSpecifier.RegionSpecifierType.REGION_NAME,
|
||||
r.getRegionInfo().getRegionName())).addAction(ClientProtos.Action.newBuilder()
|
||||
.setMutation(
|
||||
ProtobufUtil.toMutationNoData(ClientProtos.MutationProto.MutationType.PUT, put))))
|
||||
.build();
|
||||
|
||||
List<Cell> cells = new ArrayList<>();
|
||||
for (Mutation m : rm.getMutations()) {
|
||||
cells.addAll(m.getCellList(FAMILY));
|
||||
}
|
||||
cells.addAll(put.getCellList(FAMILY));
|
||||
assertEquals(3, cells.size());
|
||||
HBaseRpcController controller = Mockito.mock(HBaseRpcController.class);
|
||||
Mockito.when(controller.cellScanner()).thenReturn(CellUtil.createCellScanner(cells));
|
||||
HRegionServer rs = TEST_UTIL.getMiniHBaseCluster().getRegionServer(
|
||||
TEST_UTIL.getMiniHBaseCluster()
|
||||
.getServerHoldingRegion(TABLE_NAME, r.getRegionInfo().getRegionName()));
|
||||
ClientProtos.MultiResponse response = rs.getRSRpcServices().multi(controller, request);
|
||||
assertEquals(2, response.getRegionActionResultCount());
|
||||
assertTrue(response.getRegionActionResultList().get(0).hasException());
|
||||
assertFalse(response.getRegionActionResultList().get(1).hasException());
|
||||
assertEquals(1, response.getRegionActionResultList().get(1).getResultOrExceptionCount());
|
||||
assertTrue(
|
||||
response.getRegionActionResultList().get(1).getResultOrExceptionList().get(0).hasResult());
|
||||
try (Table table = TEST_UTIL.getConnection().getTable(TABLE_NAME)) {
|
||||
Result result = table.get(new Get(Bytes.toBytes("good")));
|
||||
assertEquals(1, result.size());
|
||||
Cell cell = result.getColumnLatestCell(FAMILY, null);
|
||||
assertTrue(Bytes.equals(CellUtil.cloneValue(cell), new byte[10]));
|
||||
}
|
||||
}
|
||||
|
||||
private static ClientProtos.MultiRequest createRequest(RowMutations rm, byte[] regionName)
|
||||
throws IOException {
|
||||
ClientProtos.RegionAction.Builder builder = RequestConverter
|
||||
.getRegionActionBuilderWithRegion(ClientProtos.RegionAction.newBuilder(), regionName);
|
||||
builder.setAtomic(true);
|
||||
ClientProtos.Action.Builder actionBuilder = ClientProtos.Action.newBuilder();
|
||||
ClientProtos.MutationProto.Builder mutationBuilder = ClientProtos.MutationProto.newBuilder();
|
||||
ClientProtos.Condition condition = RequestConverter
|
||||
.buildCondition(rm.getRow(), FAMILY, FAMILY, new BinaryComparator(new byte[10]),
|
||||
HBaseProtos.CompareType.EQUAL);
|
||||
for (Mutation mutation : rm.getMutations()) {
|
||||
ClientProtos.MutationProto.MutationType mutateType = null;
|
||||
if (mutation instanceof Put) {
|
||||
mutateType = ClientProtos.MutationProto.MutationType.PUT;
|
||||
} else if (mutation instanceof Delete) {
|
||||
mutateType = ClientProtos.MutationProto.MutationType.DELETE;
|
||||
} else {
|
||||
throw new DoNotRetryIOException(
|
||||
"RowMutations supports only put and delete, not " + mutation.getClass().getName());
|
||||
}
|
||||
mutationBuilder.clear();
|
||||
ClientProtos.MutationProto mp =
|
||||
ProtobufUtil.toMutationNoData(mutateType, mutation, mutationBuilder);
|
||||
actionBuilder.clear();
|
||||
actionBuilder.setMutation(mp);
|
||||
builder.addAction(actionBuilder.build());
|
||||
}
|
||||
ClientProtos.MultiRequest request =
|
||||
ClientProtos.MultiRequest.newBuilder().addRegionAction(builder.build())
|
||||
.setCondition(condition).build();
|
||||
return request;
|
||||
}
|
||||
|
||||
/**
|
||||
* This test depends on how regionserver process the batch ops. 1) group the put/delete until
|
||||
* meeting the increment 2) process the batch of put/delete 3) process the increment see
|
||||
* RSRpcServices#doNonAtomicRegionMutation
|
||||
*/
|
||||
@Test
|
||||
public void testNonAtomicOperations() throws InterruptedException, IOException {
|
||||
Increment inc = new Increment(Bytes.toBytes("good")).addColumn(FAMILY, FAMILY, 100);
|
||||
List<Row> batches = new ArrayList<>();
|
||||
// the first and second puts will be group by regionserver
|
||||
batches.add(new Put(Bytes.toBytes("fail")).addColumn(FAMILY, null, new byte[CELL_SIZE]));
|
||||
batches.add(new Put(Bytes.toBytes("fail")).addColumn(FAMILY, null, new byte[CELL_SIZE]));
|
||||
// this Increment should succeed
|
||||
batches.add(inc);
|
||||
// this put should succeed
|
||||
batches.add(new Put(Bytes.toBytes("good")).addColumn(FAMILY, null, new byte[1]));
|
||||
Object[] objs = new Object[batches.size()];
|
||||
try (Table table = TEST_UTIL.getConnection().getTable(TABLE_NAME)) {
|
||||
table.batch(batches, objs);
|
||||
fail("Where is the exception? We put the malformed cells!!!");
|
||||
} catch (RetriesExhaustedWithDetailsException e) {
|
||||
assertEquals(2, e.getNumExceptions());
|
||||
for (int i = 0; i != e.getNumExceptions(); ++i) {
|
||||
assertNotNull(e.getCause(i));
|
||||
assertEquals(DoNotRetryIOException.class, e.getCause(i).getClass());
|
||||
assertEquals("fail", Bytes.toString(e.getRow(i).getRow()));
|
||||
}
|
||||
} finally {
|
||||
assertObjects(objs, batches.size());
|
||||
assertTrue(objs[0] instanceof IOException);
|
||||
assertTrue(objs[1] instanceof IOException);
|
||||
assertEquals(Result.class, objs[2].getClass());
|
||||
assertEquals(Result.class, objs[3].getClass());
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRowMutations() throws InterruptedException, IOException {
|
||||
Put put = new Put(Bytes.toBytes("good")).addColumn(FAMILY, null, new byte[1]);
|
||||
List<Row> batches = new ArrayList<>();
|
||||
RowMutations mutations = new RowMutations(Bytes.toBytes("fail"));
|
||||
// the first and second puts will be group by regionserver
|
||||
mutations.add(new Put(Bytes.toBytes("fail")).addColumn(FAMILY, null, new byte[CELL_SIZE]));
|
||||
mutations.add(new Put(Bytes.toBytes("fail")).addColumn(FAMILY, null, new byte[CELL_SIZE]));
|
||||
batches.add(mutations);
|
||||
// this bm should succeed
|
||||
mutations = new RowMutations(Bytes.toBytes("good"));
|
||||
mutations.add(put);
|
||||
mutations.add(put);
|
||||
batches.add(mutations);
|
||||
Object[] objs = new Object[batches.size()];
|
||||
try (Table table = TEST_UTIL.getConnection().getTable(TABLE_NAME)) {
|
||||
table.batch(batches, objs);
|
||||
fail("Where is the exception? We put the malformed cells!!!");
|
||||
} catch (RetriesExhaustedWithDetailsException e) {
|
||||
assertEquals(1, e.getNumExceptions());
|
||||
for (int i = 0; i != e.getNumExceptions(); ++i) {
|
||||
assertNotNull(e.getCause(i));
|
||||
assertTrue(e.getCause(i) instanceof IOException);
|
||||
assertEquals("fail", Bytes.toString(e.getRow(i).getRow()));
|
||||
}
|
||||
} finally {
|
||||
assertObjects(objs, batches.size());
|
||||
assertTrue(objs[0] instanceof IOException);
|
||||
assertEquals(Result.class, objs[1].getClass());
|
||||
}
|
||||
}
|
||||
|
||||
private static void assertObjects(Object[] objs, int expectedSize) {
|
||||
int count = 0;
|
||||
for (Object obj : objs) {
|
||||
assertNotNull(obj);
|
||||
++count;
|
||||
}
|
||||
assertEquals(expectedSize, count);
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue