From 6923472f75c8a7857712d078aaf054f942297bac Mon Sep 17 00:00:00 2001 From: Chia-Ping Tsai Date: Sun, 11 Feb 2018 03:49:53 +0800 Subject: [PATCH] HBASE-19876 The exception happening in converting pb mutation to hbase.mutation messes up the CellScanner --- .../shaded/protobuf/RequestConverter.java | 4 +- .../hbase/regionserver/RSRpcServices.java | 140 ++++++------ .../client/TestMalformedCellFromClient.java | 203 ++++++++++++++++-- 3 files changed, 263 insertions(+), 84 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java index 8ac705898ad..0afcfe13f07 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java @@ -473,7 +473,7 @@ public final class RequestConverter { return regionActionBuilder; } - private static RegionAction.Builder getRegionActionBuilderWithRegion( + public static RegionAction.Builder getRegionActionBuilderWithRegion( final RegionAction.Builder regionActionBuilder, final byte [] regionName) { RegionSpecifier region = buildRegionSpecifier(RegionSpecifierType.REGION_NAME, regionName); regionActionBuilder.setRegion(region); @@ -1099,7 +1099,7 @@ public final class RequestConverter { * @return a Condition * @throws IOException */ - private static Condition buildCondition(final byte[] row, final byte[] family, + public static Condition buildCondition(final byte[] row, final byte[] family, final byte[] qualifier, final ByteArrayComparable comparator, final CompareType compareType) throws IOException { Condition.Builder builder = Condition.newBuilder(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index fd28e05d565..07f78613b89 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -555,23 +555,6 @@ public class RSRpcServices implements HBaseRPCErrorHandler, } } - /** - * Mutate a list of rows atomically. - * @param cellScanner if non-null, the mutation data -- the Cell content. - */ - private void mutateRows(final HRegion region, final OperationQuota quota, - final List actions, final CellScanner cellScanner, - RegionActionResult.Builder builder, final ActivePolicyEnforcement spaceQuotaEnforcement) - throws IOException { - for (ClientProtos.Action action: actions) { - if (action.hasGet()) { - throw new DoNotRetryIOException("Atomic put and/or delete only, not a Get=" + - action.getGet()); - } - } - doBatchOp(builder, region, quota, actions, cellScanner, spaceQuotaEnforcement, true); - } - /** * Mutate a list of rows atomically. * @param cellScanner if non-null, the mutation data -- the Cell content. @@ -580,46 +563,56 @@ public class RSRpcServices implements HBaseRPCErrorHandler, final CellScanner cellScanner, byte[] row, byte[] family, byte[] qualifier, CompareOperator op, ByteArrayComparable comparator, RegionActionResult.Builder builder, ActivePolicyEnforcement spaceQuotaEnforcement) throws IOException { - if (!region.getRegionInfo().isMetaRegion()) { - regionServer.cacheFlusher.reclaimMemStoreMemory(); - } - RowMutations rm = null; - int i = 0; - ClientProtos.ResultOrException.Builder resultOrExceptionOrBuilder = + int countOfCompleteMutation = 0; + try { + if (!region.getRegionInfo().isMetaRegion()) { + regionServer.cacheFlusher.reclaimMemStoreMemory(); + } + RowMutations rm = null; + int i = 0; + ClientProtos.ResultOrException.Builder resultOrExceptionOrBuilder = ClientProtos.ResultOrException.newBuilder(); - for (ClientProtos.Action action: actions) { - if (action.hasGet()) { - throw new DoNotRetryIOException("Atomic put and/or delete only, not a Get=" + + for (ClientProtos.Action action: actions) { + if (action.hasGet()) { + throw new DoNotRetryIOException("Atomic put and/or delete only, not a Get=" + action.getGet()); - } - MutationType type = action.getMutation().getMutateType(); - if (rm == null) { - rm = new RowMutations(action.getMutation().getRow().toByteArray(), actions.size()); - } - switch (type) { - case PUT: - Put put = ProtobufUtil.toPut(action.getMutation(), cellScanner); - checkCellSizeLimit(region, put); - spaceQuotaEnforcement.getPolicyEnforcement(region).check(put); - rm.add(put); - break; - case DELETE: - Delete del = ProtobufUtil.toDelete(action.getMutation(), cellScanner); - spaceQuotaEnforcement.getPolicyEnforcement(region).check(del); - rm.add(del); - break; - default: - throw new DoNotRetryIOException("Atomic put and/or delete only, not " + type.name()); - } - // To unify the response format with doNonAtomicRegionMutation and read through client's - // AsyncProcess we have to add an empty result instance per operation - resultOrExceptionOrBuilder.clear(); - resultOrExceptionOrBuilder.setIndex(i++); - builder.addResultOrException( + } + MutationType type = action.getMutation().getMutateType(); + if (rm == null) { + rm = new RowMutations(action.getMutation().getRow().toByteArray(), actions.size()); + } + switch (type) { + case PUT: + Put put = ProtobufUtil.toPut(action.getMutation(), cellScanner); + ++countOfCompleteMutation; + checkCellSizeLimit(region, put); + spaceQuotaEnforcement.getPolicyEnforcement(region).check(put); + rm.add(put); + break; + case DELETE: + Delete del = ProtobufUtil.toDelete(action.getMutation(), cellScanner); + ++countOfCompleteMutation; + spaceQuotaEnforcement.getPolicyEnforcement(region).check(del); + rm.add(del); + break; + default: + throw new DoNotRetryIOException("Atomic put and/or delete only, not " + type.name()); + } + // To unify the response format with doNonAtomicRegionMutation and read through client's + // AsyncProcess we have to add an empty result instance per operation + resultOrExceptionOrBuilder.clear(); + resultOrExceptionOrBuilder.setIndex(i++); + builder.addResultOrException( resultOrExceptionOrBuilder.build()); + } + return region.checkAndRowMutate(row, family, qualifier, op, comparator, rm); + } finally { + // Currently, the checkAndMutate isn't supported by batch so it won't mess up the cell scanner + // even if the malformed cells are not skipped. + for (int i = countOfCompleteMutation; i < actions.size(); ++i) { + skipCellsForMutation(actions.get(i), cellScanner); + } } - return region.checkAndRowMutate(row, family, qualifier, op, - comparator, rm); } /** @@ -786,9 +779,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, context.incrementResponseExceptionSize(pair.getSerializedSize()); resultOrExceptionBuilder.setIndex(action.getIndex()); builder.addResultOrException(resultOrExceptionBuilder.build()); - if (cellScanner != null) { - skipCellsForMutation(action, cellScanner); - } + skipCellsForMutation(action, cellScanner); continue; } if (action.hasGet()) { @@ -895,6 +886,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler, try { doBatchOp(builder, region, quota, mutations, cellScanner, spaceQuotaEnforcement, false); } catch (IOException ioe) { + // TODO do the refactor to avoid this catch as it is useless + // doBatchOp has handled the IOE for all non-atomic operations. rpcServer.getMetrics().exception(ioe); NameBytesPair pair = ResponseConverter.buildException(ioe); resultOrExceptionBuilder.setException(pair); @@ -946,6 +939,10 @@ public class RSRpcServices implements HBaseRPCErrorHandler, Map mutationActionMap = new HashMap<>(); int i = 0; for (ClientProtos.Action action: mutations) { + if (action.hasGet()) { + throw new DoNotRetryIOException("Atomic put and/or delete only, not a Get=" + + action.getGet()); + } MutationProto m = action.getMutation(); Mutation mutation; if (m.getMutateType() == MutationType.PUT) { @@ -968,8 +965,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler, } // HBASE-17924 - // Sort to improve lock efficiency for non-atomic batch of operations. If atomic (mostly - // called from mutateRows()), order is preserved as its expected from the client + // Sort to improve lock efficiency for non-atomic batch of operations. If atomic + // order is preserved as its expected from the client if (!atomic) { Arrays.sort(mArray, (v1, v2) -> Row.COMPARATOR.compare(v1, v2)); } @@ -1004,12 +1001,19 @@ public class RSRpcServices implements HBaseRPCErrorHandler, } } } catch (IOException ie) { + int processedMutationIndex = 0; + for (Action mutation : mutations) { + // The non-null mArray[i] means the cell scanner has been read. + if (mArray[processedMutationIndex++] == null) { + skipCellsForMutation(mutation, cells); + } + if (!atomic) { + builder.addResultOrException(getResultOrException(ie, mutation.getIndex())); + } + } if (atomic) { throw ie; } - for (Action mutation : mutations) { - builder.addResultOrException(getResultOrException(ie, mutation.getIndex())); - } } if (regionServer.metricsRegionServer != null) { long after = EnvironmentEdgeManager.currentTime(); @@ -2549,9 +2553,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, // All Mutations in this RegionAction not executed as we can not see the Region online here // in this RS. Will be retried from Client. Skipping all the Cells in CellScanner // corresponding to these Mutations. - if (cellScanner != null) { - skipCellsForMutations(regionAction.getActionList(), cellScanner); - } + skipCellsForMutations(regionAction.getActionList(), cellScanner); continue; // For this region it's a failure. } @@ -2572,8 +2574,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler, cellScanner, row, family, qualifier, op, comparator, regionActionResultBuilder, spaceQuotaEnforcement); } else { - mutateRows(region, quota, regionAction.getActionList(), cellScanner, - regionActionResultBuilder, spaceQuotaEnforcement); + doBatchOp(regionActionResultBuilder, region, quota, regionAction.getActionList(), + cellScanner, spaceQuotaEnforcement, true); processed = Boolean.TRUE; } } catch (IOException e) { @@ -2620,12 +2622,18 @@ public class RSRpcServices implements HBaseRPCErrorHandler, } private void skipCellsForMutations(List actions, CellScanner cellScanner) { + if (cellScanner == null) { + return; + } for (Action action : actions) { skipCellsForMutation(action, cellScanner); } } private void skipCellsForMutation(Action action, CellScanner cellScanner) { + if (cellScanner == null) { + return; + } try { if (action.hasMutation()) { MutationProto m = action.getMutation(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMalformedCellFromClient.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMalformedCellFromClient.java index e44a2e91d98..6b57b89a2d0 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMalformedCellFromClient.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMalformedCellFromClient.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hbase.client; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -29,11 +30,15 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.filter.BinaryComparator; +import org.apache.hadoop.hbase.ipc.HBaseRpcController; import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.testclassification.ClientTests; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.util.Bytes; @@ -44,23 +49,21 @@ import org.junit.BeforeClass; import org.junit.ClassRule; import org.junit.Test; import org.junit.experimental.categories.Category; +import org.mockito.Mockito; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; +import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; /** - * The purpose of this test is to make sure the region exception won't corrupt the results - * of batch. The prescription is shown below. - * 1) honor the action result rather than region exception. If the action have both of true result - * and region exception, the action is fine as the exception is caused by other actions - * which are in the same region. - * 2) honor the action exception rather than region exception. If the action have both of action - * exception and region exception, we deal with the action exception only. If we also - * handle the region exception for the same action, it will introduce the negative count of - * actions in progress. The AsyncRequestFuture#waitUntilDone will block forever. - * - * The no-cluster test is in TestAsyncProcessWithRegionException. + * The purpose of this test is to ensure whether rs deals with the malformed cells correctly. */ @Category({ MediumTests.class, ClientTests.class }) public class TestMalformedCellFromClient { - + private static final Logger LOG = LoggerFactory.getLogger(TestMalformedCellFromClient.class); @ClassRule public static final HBaseClassTestRule CLASS_RULE = HBaseClassTestRule.forClass(TestMalformedCellFromClient.class); @@ -139,10 +142,16 @@ public class TestMalformedCellFromClient { } /** - * The purpose of this ut is to check the consistency between the exception and results. - * If the RetriesExhaustedWithDetailsException contains the whole batch, - * each result should be of IOE. Otherwise, the row operation which is not in the exception - * should have a true result. + * This test verifies region exception doesn't corrupt the results of batch. The prescription is + * shown below. 1) honor the action result rather than region exception. If the action have both + * of true result and region exception, the action is fine as the exception is caused by other + * actions which are in the same region. 2) honor the action exception rather than region + * exception. If the action have both of action exception and region exception, we deal with the + * action exception only. If we also handle the region exception for the same action, it will + * introduce the negative count of actions in progress. The AsyncRequestFuture#waitUntilDone will + * block forever. If the RetriesExhaustedWithDetailsException contains the whole batch, each + * result should be of IOE. Otherwise, the row operation which is not in the exception should have + * a true result. The no-cluster test is in TestAsyncProcessWithRegionException. */ @Test public void testRegionExceptionByAsync() throws Exception { @@ -170,4 +179,166 @@ public class TestMalformedCellFromClient { assertTrue(Bytes.equals(CellUtil.cloneValue(cell), new byte[10])); } } + + /** + * The invalid cells is in rm. The rm should fail but the subsequent mutations should succeed. + * Currently, we have no client api to submit the request consisting of condition-rm and mutation. + * Hence, this test build the request manually. + */ + @Test + public void testAtomicOperations() throws Exception { + RowMutations rm = new RowMutations(Bytes.toBytes("fail")); + rm.add(new Put(rm.getRow()).addColumn(FAMILY, null, new byte[CELL_SIZE])); + rm.add(new Put(rm.getRow()).addColumn(FAMILY, null, new byte[10])); + Put put = new Put(Bytes.toBytes("good")).addColumn(FAMILY, null, new byte[10]); + + // build the request + HRegion r = TEST_UTIL.getMiniHBaseCluster().getRegions(TABLE_NAME).get(0); + ClientProtos.MultiRequest request = + ClientProtos.MultiRequest.newBuilder(createRequest(rm, r.getRegionInfo().getRegionName())) + .addRegionAction(ClientProtos.RegionAction.newBuilder().setRegion(RequestConverter + .buildRegionSpecifier(HBaseProtos.RegionSpecifier.RegionSpecifierType.REGION_NAME, + r.getRegionInfo().getRegionName())).addAction(ClientProtos.Action.newBuilder() + .setMutation( + ProtobufUtil.toMutationNoData(ClientProtos.MutationProto.MutationType.PUT, put)))) + .build(); + + List cells = new ArrayList<>(); + for (Mutation m : rm.getMutations()) { + cells.addAll(m.getCellList(FAMILY)); + } + cells.addAll(put.getCellList(FAMILY)); + assertEquals(3, cells.size()); + HBaseRpcController controller = Mockito.mock(HBaseRpcController.class); + Mockito.when(controller.cellScanner()).thenReturn(CellUtil.createCellScanner(cells)); + HRegionServer rs = TEST_UTIL.getMiniHBaseCluster().getRegionServer( + TEST_UTIL.getMiniHBaseCluster() + .getServerHoldingRegion(TABLE_NAME, r.getRegionInfo().getRegionName())); + + ClientProtos.MultiResponse response = rs.getRSRpcServices().multi(controller, request); + assertEquals(2, response.getRegionActionResultCount()); + assertTrue(response.getRegionActionResultList().get(0).hasException()); + assertFalse(response.getRegionActionResultList().get(1).hasException()); + assertEquals(1, response.getRegionActionResultList().get(1).getResultOrExceptionCount()); + assertTrue( + response.getRegionActionResultList().get(1).getResultOrExceptionList().get(0).hasResult()); + try (Table table = TEST_UTIL.getConnection().getTable(TABLE_NAME)) { + Result result = table.get(new Get(Bytes.toBytes("good"))); + assertEquals(1, result.size()); + Cell cell = result.getColumnLatestCell(FAMILY, null); + assertTrue(Bytes.equals(CellUtil.cloneValue(cell), new byte[10])); + } + } + + private static ClientProtos.MultiRequest createRequest(RowMutations rm, byte[] regionName) + throws IOException { + ClientProtos.RegionAction.Builder builder = RequestConverter + .getRegionActionBuilderWithRegion(ClientProtos.RegionAction.newBuilder(), regionName); + builder.setAtomic(true); + ClientProtos.Action.Builder actionBuilder = ClientProtos.Action.newBuilder(); + ClientProtos.MutationProto.Builder mutationBuilder = ClientProtos.MutationProto.newBuilder(); + ClientProtos.Condition condition = RequestConverter + .buildCondition(rm.getRow(), FAMILY, null, new BinaryComparator(new byte[10]), + HBaseProtos.CompareType.EQUAL); + for (Mutation mutation : rm.getMutations()) { + ClientProtos.MutationProto.MutationType mutateType = null; + if (mutation instanceof Put) { + mutateType = ClientProtos.MutationProto.MutationType.PUT; + } else if (mutation instanceof Delete) { + mutateType = ClientProtos.MutationProto.MutationType.DELETE; + } else { + throw new DoNotRetryIOException( + "RowMutations supports only put and delete, not " + mutation.getClass().getName()); + } + mutationBuilder.clear(); + ClientProtos.MutationProto mp = + ProtobufUtil.toMutationNoData(mutateType, mutation, mutationBuilder); + actionBuilder.clear(); + actionBuilder.setMutation(mp); + builder.addAction(actionBuilder.build()); + } + ClientProtos.MultiRequest request = + ClientProtos.MultiRequest.newBuilder().addRegionAction(builder.build()) + .setCondition(condition).build(); + return request; + } + + /** + * This test depends on how regionserver process the batch ops. + * 1) group the put/delete until meeting the increment + * 2) process the batch of put/delete + * 3) process the increment + * see RSRpcServices#doNonAtomicRegionMutation + */ + @Test + public void testNonAtomicOperations() throws InterruptedException, IOException { + Increment inc = new Increment(Bytes.toBytes("good")).addColumn(FAMILY, null, 100); + List batches = new ArrayList<>(); + // the first and second puts will be group by regionserver + batches.add(new Put(Bytes.toBytes("fail")).addColumn(FAMILY, null, new byte[CELL_SIZE])); + batches.add(new Put(Bytes.toBytes("fail")).addColumn(FAMILY, null, new byte[CELL_SIZE])); + // this Increment should succeed + batches.add(inc); + // this put should succeed + batches.add(new Put(Bytes.toBytes("good")).addColumn(FAMILY, null, new byte[1])); + Object[] objs = new Object[batches.size()]; + try (Table table = TEST_UTIL.getConnection().getTable(TABLE_NAME)) { + table.batch(batches, objs); + fail("Where is the exception? We put the malformed cells!!!"); + } catch (RetriesExhaustedWithDetailsException e) { + assertEquals(2, e.getNumExceptions()); + for (int i = 0; i != e.getNumExceptions(); ++i) { + assertNotNull(e.getCause(i)); + assertEquals(DoNotRetryIOException.class, e.getCause(i).getClass()); + assertEquals("fail", Bytes.toString(e.getRow(i).getRow())); + } + } finally { + assertObjects(objs, batches.size()); + assertTrue(objs[0] instanceof IOException); + assertTrue(objs[1] instanceof IOException); + assertEquals(Result.class, objs[2].getClass()); + assertEquals(Result.class, objs[3].getClass()); + } + } + + @Test + public void testRowMutations() throws InterruptedException, IOException { + Put put = new Put(Bytes.toBytes("good")).addColumn(FAMILY, null, new byte[1]); + List batches = new ArrayList<>(); + RowMutations mutations = new RowMutations(Bytes.toBytes("fail")); + // the first and second puts will be group by regionserver + mutations.add(new Put(Bytes.toBytes("fail")).addColumn(FAMILY, null, new byte[CELL_SIZE])); + mutations.add(new Put(Bytes.toBytes("fail")).addColumn(FAMILY, null, new byte[CELL_SIZE])); + batches.add(mutations); + // this bm should succeed + mutations = new RowMutations(Bytes.toBytes("good")); + mutations.add(put); + mutations.add(put); + batches.add(mutations); + Object[] objs = new Object[batches.size()]; + try (Table table = TEST_UTIL.getConnection().getTable(TABLE_NAME)) { + table.batch(batches, objs); + fail("Where is the exception? We put the malformed cells!!!"); + } catch (RetriesExhaustedWithDetailsException e) { + assertEquals(1, e.getNumExceptions()); + for (int i = 0; i != e.getNumExceptions(); ++i) { + assertNotNull(e.getCause(i)); + assertTrue(e.getCause(i) instanceof IOException); + assertEquals("fail", Bytes.toString(e.getRow(i).getRow())); + } + } finally { + assertObjects(objs, batches.size()); + assertTrue(objs[0] instanceof IOException); + assertEquals(Result.class, objs[1].getClass()); + } + } + + private static void assertObjects(Object[] objs, int expectedSize) { + int count = 0; + for (Object obj : objs) { + assertNotNull(obj); + ++count; + } + assertEquals(expectedSize, count); + } }