From d05a3722c8347363eb04a3e5457d13ae5d0d6de6 Mon Sep 17 00:00:00 2001 From: Enis Soztutar Date: Tue, 7 Jun 2016 20:12:51 -0700 Subject: [PATCH] HBASE-15600 Add provision for adding mutations to memstore or able to write to same region in batchMutate coprocessor hooks (Rajeshbabu and Enis) --- .../hadoop/hbase/regionserver/HRegion.java | 76 ++++- .../MiniBatchOperationInProgress.java | 29 +- .../MultiRowMutationProcessor.java | 2 +- ...verForAddingMutationsFromCoprocessors.java | 282 ++++++++++++++++++ 4 files changed, 370 insertions(+), 19 deletions(-) create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverForAddingMutationsFromCoprocessors.java diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 6522fdeaa98..9c966cd48af 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -3134,6 +3134,35 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi batchOp.retCodeDetails, batchOp.walEditsFromCoprocessors, firstIndex, lastIndexExclusive); if (coprocessorHost.preBatchMutate(miniBatchOp)) { return 0L; + } else { + for (int i = firstIndex; i < lastIndexExclusive; i++) { + if (batchOp.retCodeDetails[i].getOperationStatusCode() != OperationStatusCode.NOT_RUN) { + // lastIndexExclusive was incremented above. + continue; + } + // we pass (i - firstIndex) below since the call expects a relative index + Mutation[] cpMutations = miniBatchOp.getOperationsFromCoprocessors(i - firstIndex); + if (cpMutations == null) { + continue; + } + // Else Coprocessor added more Mutations corresponding to the Mutation at this index. + for (int j = 0; j < cpMutations.length; j++) { + Mutation cpMutation = cpMutations[j]; + Map> cpFamilyMap = cpMutation.getFamilyCellMap(); + checkAndPrepareMutation(cpMutation, replay, cpFamilyMap, now); + + // Acquire row locks. If not, the whole batch will fail. + acquiredRowLocks.add(getRowLockInternal(cpMutation.getRow(), true)); + + if (cpMutation.getDurability() == Durability.SKIP_WAL) { + recordMutationWithoutWal(cpFamilyMap); + } + + // Returned mutations from coprocessor correspond to the Mutation at index i. We can + // directly add the cells from those mutations to the familyMaps of this mutation. + mergeFamilyMaps(familyMaps[i], cpFamilyMap); // will get added to the memstore later + } + } } } @@ -3310,9 +3339,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi // call the coprocessor hook to do any finalization steps // after the put is done MiniBatchOperationInProgress miniBatchOp = - new MiniBatchOperationInProgress(batchOp.getMutationsForCoprocs(), - batchOp.retCodeDetails, batchOp.walEditsFromCoprocessors, firstIndex, - lastIndexExclusive); + new MiniBatchOperationInProgress(batchOp.getMutationsForCoprocs(), + batchOp.retCodeDetails, batchOp.walEditsFromCoprocessors, firstIndex, lastIndexExclusive); coprocessorHost.postBatchMutateIndispensably(miniBatchOp, success); } @@ -3320,6 +3348,18 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } } + private void mergeFamilyMaps(Map> familyMap, + Map> toBeMerged) { + for (Map.Entry> entry : toBeMerged.entrySet()) { + List cells = familyMap.get(entry.getKey()); + if (cells == null) { + familyMap.put(entry.getKey(), entry.getValue()); + } else { + cells.addAll(entry.getValue()); + } + } + } + private void appendCurrentNonces(final Mutation mutation, final boolean replay, final WALEdit walEdit, final long now, final long currentNonceGroup, final long currentNonce) throws IOException { @@ -3348,18 +3388,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi familyMaps[lastIndexExclusive] = familyMap; try { - if (mutation instanceof Put) { - // Check the families in the put. If bad, skip this one. - if (batchOp.isInReplay()) { - removeNonExistentColumnFamilyForReplay(familyMap); - } else { - checkFamilies(familyMap.keySet()); - } - checkTimestamps(mutation.getFamilyCellMap(), now); - } else { - prepareDelete((Delete)mutation); - } - checkRow(mutation.getRow(), "doMiniBatchMutation"); + checkAndPrepareMutation(mutation, batchOp.isInReplay(), familyMap, now); } catch (NoSuchColumnFamilyException nscf) { LOG.warn("No such column family in batch mutation", nscf); batchOp.retCodeDetails[lastIndexExclusive] = new OperationStatus( @@ -3379,6 +3408,23 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi return skip; } + private void checkAndPrepareMutation(Mutation mutation, boolean replay, + final Map> familyMap, final long now) + throws IOException { + if (mutation instanceof Put) { + // Check the families in the put. If bad, skip this one. + if (replay) { + removeNonExistentColumnFamilyForReplay(familyMap); + } else { + checkFamilies(familyMap.keySet()); + } + checkTimestamps(mutation.getFamilyCellMap(), now); + } else { + prepareDelete((Delete)mutation); + } + checkRow(mutation.getRow(), "doMiniBatchMutation"); + } + /** * During replay, there could exist column families which are removed between region server * failure and replay diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MiniBatchOperationInProgress.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MiniBatchOperationInProgress.java index 2b12dec6bd3..cdbecacff2e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MiniBatchOperationInProgress.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MiniBatchOperationInProgress.java @@ -18,20 +18,22 @@ package org.apache.hadoop.hbase.regionserver; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; /** * Wraps together the mutations which are applied as a batch to the region and their operation - * status and WALEdits. + * status and WALEdits. * @see org.apache.hadoop.hbase.coprocessor.RegionObserver#preBatchMutate( * ObserverContext, MiniBatchOperationInProgress) * @see org.apache.hadoop.hbase.coprocessor.RegionObserver#postBatchMutate( * ObserverContext, MiniBatchOperationInProgress) * @param T Pair<Mutation, Integer> pair of Mutations and associated rowlock ids . */ -@InterfaceAudience.Private +@InterfaceAudience.LimitedPrivate("Coprocessors") public class MiniBatchOperationInProgress { private final T[] operations; + private Mutation[][] operationsFromCoprocessors; private final OperationStatus[] retCodeDetails; private final WALEdit[] walEditsFromCoprocessors; private final int firstIndex; @@ -63,7 +65,7 @@ public class MiniBatchOperationInProgress { /** * Sets the status code for the operation(Mutation) at the specified position. - * By setting this status, {@link org.apache.hadoop.hbase.coprocessor.RegionObserver} + * By setting this status, {@link org.apache.hadoop.hbase.coprocessor.RegionObserver} * can make HRegion to skip Mutations. * @param index * @param opStatus @@ -103,4 +105,25 @@ public class MiniBatchOperationInProgress { } return this.firstIndex + index; } + + /** + * Add more Mutations corresponding to the Mutation at the given index to be committed atomically + * in the same batch. These mutations are applied to the WAL and applied to the memstore as well. + * The timestamp of the cells in the given Mutations MUST be obtained from the original mutation. + * + * @param index the index that corresponds to the original mutation index in the batch + * @param newOperations the Mutations to add + */ + public void addOperationsFromCP(int index, Mutation[] newOperations) { + if (this.operationsFromCoprocessors == null) { + // lazy allocation to save on object allocation in case this is not used + this.operationsFromCoprocessors = new Mutation[operations.length][]; + } + this.operationsFromCoprocessors[getAbsoluteIndex(index)] = newOperations; + } + + public Mutation[] getOperationsFromCoprocessors(int index) { + return operationsFromCoprocessors == null ? null : + operationsFromCoprocessors[getAbsoluteIndex(index)]; + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiRowMutationProcessor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiRowMutationProcessor.java index 1947a1b4a08..995ea933f55 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiRowMutationProcessor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiRowMutationProcessor.java @@ -60,7 +60,7 @@ MultiRowMutationProcessorResponse> { public boolean readOnly() { return false; } - + @Override public MultiRowMutationProcessorResponse getResult() { return MultiRowMutationProcessorResponse.getDefaultInstance(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverForAddingMutationsFromCoprocessors.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverForAddingMutationsFromCoprocessors.java new file mode 100644 index 00000000000..98e930a974a --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverForAddingMutationsFromCoprocessors.java @@ -0,0 +1,282 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.coprocessor; + +import static org.junit.Assert.*; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress; +import org.apache.hadoop.hbase.regionserver.wal.WALEdit; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.wal.WALKey; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; + +import com.google.common.collect.Lists; + +@Category(MediumTests.class) +public class TestRegionObserverForAddingMutationsFromCoprocessors { + + private static final Log LOG + = LogFactory.getLog(TestRegionObserverForAddingMutationsFromCoprocessors.class); + + private static HBaseTestingUtility util; + private static final byte[] dummy = Bytes.toBytes("dummy"); + private static final byte[] row1 = Bytes.toBytes("r1"); + private static final byte[] row2 = Bytes.toBytes("r2"); + private static final byte[] row3 = Bytes.toBytes("r3"); + private static final byte[] test = Bytes.toBytes("test"); + + @Rule + public TestName name = new TestName(); + private TableName tableName; + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + Configuration conf = HBaseConfiguration.create(); + conf.set(CoprocessorHost.WAL_COPROCESSOR_CONF_KEY, TestWALObserver.class.getName()); + util = new HBaseTestingUtility(conf); + util.startMiniCluster(); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + util.shutdownMiniCluster(); + } + + @Before + public void setUp() throws Exception { + tableName = TableName.valueOf(name.getMethodName()); + } + + private void createTable(String coprocessor) throws IOException { + HTableDescriptor htd = new HTableDescriptor(tableName) + .addFamily(new HColumnDescriptor(dummy)) + .addFamily(new HColumnDescriptor(test)) + .addCoprocessor(coprocessor); + util.getAdmin().createTable(htd); + } + + /** + * Test various multiput operations. + * @throws Exception + */ + @Test + public void testMulti() throws Exception { + createTable(TestMultiMutationCoprocessor.class.getName()); + + try (Table t = util.getConnection().getTable(tableName)) { + t.put(new Put(row1).addColumn(test, dummy, dummy)); + assertRowCount(t, 3); + } + } + + /** + * Tests that added mutations from coprocessors end up in the WAL. + */ + @Test + public void testCPMutationsAreWrittenToWALEdit() throws Exception { + createTable(TestMultiMutationCoprocessor.class.getName()); + + try (Table t = util.getConnection().getTable(tableName)) { + t.put(new Put(row1).addColumn(test, dummy, dummy)); + assertRowCount(t, 3); + } + + assertNotNull(TestWALObserver.savedEdit); + assertEquals(4, TestWALObserver.savedEdit.getCells().size()); + } + + private static void assertRowCount(Table t, int expected) throws IOException { + try (ResultScanner scanner = t.getScanner(new Scan())) { + int i = 0; + for (Result r: scanner) { + LOG.info(r.toString()); + i++; + } + assertEquals(expected, i); + } + } + + @Test + public void testDeleteCell() throws Exception { + createTable(TestDeleteCellCoprocessor.class.getName()); + + try (Table t = util.getConnection().getTable(tableName)) { + t.put(Lists.newArrayList( + new Put(row1).addColumn(test, dummy, dummy), + new Put(row2).addColumn(test, dummy, dummy), + new Put(row3).addColumn(test, dummy, dummy) + )); + + assertRowCount(t, 3); + + t.delete(new Delete(test).addColumn(test, dummy)); // delete non-existing row + assertRowCount(t, 1); + } + } + + @Test + public void testDeleteFamily() throws Exception { + createTable(TestDeleteFamilyCoprocessor.class.getName()); + + try (Table t = util.getConnection().getTable(tableName)) { + t.put(Lists.newArrayList( + new Put(row1).addColumn(test, dummy, dummy), + new Put(row2).addColumn(test, dummy, dummy), + new Put(row3).addColumn(test, dummy, dummy) + )); + + assertRowCount(t, 3); + + t.delete(new Delete(test).addFamily(test)); // delete non-existing row + assertRowCount(t, 1); + } + } + + @Test + public void testDeleteRow() throws Exception { + createTable(TestDeleteRowCoprocessor.class.getName()); + + try (Table t = util.getConnection().getTable(tableName)) { + t.put(Lists.newArrayList( + new Put(row1).addColumn(test, dummy, dummy), + new Put(row2).addColumn(test, dummy, dummy), + new Put(row3).addColumn(test, dummy, dummy) + )); + + assertRowCount(t, 3); + + t.delete(new Delete(test).addColumn(test, dummy)); // delete non-existing row + assertRowCount(t, 1); + } + } + + public static class TestMultiMutationCoprocessor extends BaseRegionObserver { + @Override + public void preBatchMutate(ObserverContext c, + MiniBatchOperationInProgress miniBatchOp) throws IOException { + Mutation mut = miniBatchOp.getOperation(0); + List cells = mut.getFamilyCellMap().get(test); + Put[] puts = new Put[] { + new Put(row1).addColumn(test, dummy, cells.get(0).getTimestamp(), + Bytes.toBytes("cpdummy")), + new Put(row2).addColumn(test, dummy, cells.get(0).getTimestamp(), dummy), + new Put(row3).addColumn(test, dummy, cells.get(0).getTimestamp(), dummy), + }; + LOG.info("Putting:" + puts); + miniBatchOp.addOperationsFromCP(0, puts); + } + } + + public static class TestDeleteCellCoprocessor extends BaseRegionObserver { + @Override + public void preBatchMutate(ObserverContext c, + MiniBatchOperationInProgress miniBatchOp) throws IOException { + Mutation mut = miniBatchOp.getOperation(0); + + if (mut instanceof Delete) { + List cells = mut.getFamilyCellMap().get(test); + Delete[] deletes = new Delete[] { + // delete only 2 rows + new Delete(row1).addColumns(test, dummy, cells.get(0).getTimestamp()), + new Delete(row2).addColumns(test, dummy, cells.get(0).getTimestamp()), + }; + LOG.info("Deleting:" + Arrays.toString(deletes)); + miniBatchOp.addOperationsFromCP(0, deletes); + } + } + } + + public static class TestDeleteFamilyCoprocessor extends BaseRegionObserver { + @Override + public void preBatchMutate(ObserverContext c, + MiniBatchOperationInProgress miniBatchOp) throws IOException { + Mutation mut = miniBatchOp.getOperation(0); + + if (mut instanceof Delete) { + List cells = mut.getFamilyCellMap().get(test); + Delete[] deletes = new Delete[] { + // delete only 2 rows + new Delete(row1).addFamily(test, cells.get(0).getTimestamp()), + new Delete(row2).addFamily(test, cells.get(0).getTimestamp()), + }; + LOG.info("Deleting:" + Arrays.toString(deletes)); + miniBatchOp.addOperationsFromCP(0, deletes); + } + } + } + + public static class TestDeleteRowCoprocessor extends BaseRegionObserver { + @Override + public void preBatchMutate(ObserverContext c, + MiniBatchOperationInProgress miniBatchOp) throws IOException { + Mutation mut = miniBatchOp.getOperation(0); + + if (mut instanceof Delete) { + List cells = mut.getFamilyCellMap().get(test); + Delete[] deletes = new Delete[] { + // delete only 2 rows + new Delete(row1, cells.get(0).getTimestamp()), + new Delete(row2, cells.get(0).getTimestamp()), + }; + LOG.info("Deleting:" + Arrays.toString(deletes)); + miniBatchOp.addOperationsFromCP(0, deletes); + } + } + } + + public static class TestWALObserver extends BaseWALObserver { + static WALEdit savedEdit = null; + @Override + public void postWALWrite(ObserverContext ctx, + HRegionInfo info, WALKey logKey, WALEdit logEdit) throws IOException { + if (info.getTable().equals(TableName.valueOf("testCPMutationsAreWrittenToWALEdit"))) { + savedEdit = logEdit; + } + super.postWALWrite(ctx, info, logKey, logEdit); + } + } +}