diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 2e2ecfea433..8eb16a6c2f5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -3079,18 +3079,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } try { - if (isPutMutation) { - // Check the families in the put. If bad, skip this one. - if (isInReplay) { - removeNonExistentColumnFamilyForReplay(familyMap); - } else { - checkFamilies(familyMap.keySet()); - } - checkTimestamps(mutation.getFamilyCellMap(), now); - } else { - prepareDelete((Delete) mutation); - } - checkRow(mutation.getRow(), "doMiniBatchMutation"); + checkAndPrepareMutation(mutation, batchOp.isInReplay(), familyMap, now); } catch (NoSuchColumnFamilyException nscf) { LOG.warn("No such column family in batch mutation", nscf); batchOp.retCodeDetails[lastIndexExclusive] = new OperationStatus( @@ -3198,7 +3187,38 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi MiniBatchOperationInProgress miniBatchOp = new MiniBatchOperationInProgress(batchOp.getMutationsForCoprocs(), batchOp.retCodeDetails, batchOp.walEditsFromCoprocessors, firstIndex, lastIndexExclusive); - if (coprocessorHost.preBatchMutate(miniBatchOp)) return 0L; + if (coprocessorHost.preBatchMutate(miniBatchOp)) { + return 0L; + } else { + for (int i = firstIndex; i < lastIndexExclusive; i++) { + if (batchOp.retCodeDetails[i].getOperationStatusCode() != OperationStatusCode.NOT_RUN) { + // lastIndexExclusive was incremented above. + continue; + } + // we pass (i - firstIndex) below since the call expects a relative index + Mutation[] cpMutations = miniBatchOp.getOperationsFromCoprocessors(i - firstIndex); + if (cpMutations == null) { + continue; + } + // Else Coprocessor added more Mutations corresponding to the Mutation at this index. + for (int j = 0; j < cpMutations.length; j++) { + Mutation cpMutation = cpMutations[j]; + Map> cpFamilyMap = cpMutation.getFamilyCellMap(); + checkAndPrepareMutation(cpMutation, isInReplay, cpFamilyMap, now); + + // Acquire row locks. If not, the whole batch will fail. + acquiredRowLocks.add(getRowLockInternal(cpMutation.getRow(), true)); + + if (cpMutation.getDurability() == Durability.SKIP_WAL) { + recordMutationWithoutWal(cpFamilyMap); + } + + // Returned mutations from coprocessor correspond to the Mutation at index i. We can + // directly add the cells from those mutations to the familyMaps of this mutation. + mergeFamilyMaps(familyMaps[i], cpFamilyMap); // will get added to the memstore later + } + } + } } // ------------------------------------ @@ -3422,9 +3442,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi // call the coprocessor hook to do any finalization steps // after the put is done MiniBatchOperationInProgress miniBatchOp = - new MiniBatchOperationInProgress(batchOp.getMutationsForCoprocs(), - batchOp.retCodeDetails, batchOp.walEditsFromCoprocessors, firstIndex, - lastIndexExclusive); + new MiniBatchOperationInProgress(batchOp.getMutationsForCoprocs(), + batchOp.retCodeDetails, batchOp.walEditsFromCoprocessors, firstIndex, lastIndexExclusive); coprocessorHost.postBatchMutateIndispensably(miniBatchOp, success); } @@ -3432,6 +3451,18 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } } + private void mergeFamilyMaps(Map> familyMap, + Map> toBeMerged) { + for (Map.Entry> entry : toBeMerged.entrySet()) { + List cells = familyMap.get(entry.getKey()); + if (cells == null) { + familyMap.put(entry.getKey(), entry.getValue()); + } else { + cells.addAll(entry.getValue()); + } + } + } + /** * Returns effective durability from the passed durability and * the table descriptor. @@ -3861,6 +3892,23 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } } + private void checkAndPrepareMutation(Mutation mutation, boolean replay, + final Map> familyMap, final long now) + throws IOException { + if (mutation instanceof Put) { + // Check the families in the put. If bad, skip this one. + if (replay) { + removeNonExistentColumnFamilyForReplay(familyMap); + } else { + checkFamilies(familyMap.keySet()); + } + checkTimestamps(mutation.getFamilyCellMap(), now); + } else { + prepareDelete((Delete)mutation); + } + checkRow(mutation.getRow(), "doMiniBatchMutation"); + } + /** * During replay, there could exist column families which are removed between region server * failure and replay diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MiniBatchOperationInProgress.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MiniBatchOperationInProgress.java index 2b12dec6bd3..cdbecacff2e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MiniBatchOperationInProgress.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MiniBatchOperationInProgress.java @@ -18,20 +18,22 @@ package org.apache.hadoop.hbase.regionserver; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; /** * Wraps together the mutations which are applied as a batch to the region and their operation - * status and WALEdits. + * status and WALEdits. * @see org.apache.hadoop.hbase.coprocessor.RegionObserver#preBatchMutate( * ObserverContext, MiniBatchOperationInProgress) * @see org.apache.hadoop.hbase.coprocessor.RegionObserver#postBatchMutate( * ObserverContext, MiniBatchOperationInProgress) * @param T Pair<Mutation, Integer> pair of Mutations and associated rowlock ids . */ -@InterfaceAudience.Private +@InterfaceAudience.LimitedPrivate("Coprocessors") public class MiniBatchOperationInProgress { private final T[] operations; + private Mutation[][] operationsFromCoprocessors; private final OperationStatus[] retCodeDetails; private final WALEdit[] walEditsFromCoprocessors; private final int firstIndex; @@ -63,7 +65,7 @@ public class MiniBatchOperationInProgress { /** * Sets the status code for the operation(Mutation) at the specified position. - * By setting this status, {@link org.apache.hadoop.hbase.coprocessor.RegionObserver} + * By setting this status, {@link org.apache.hadoop.hbase.coprocessor.RegionObserver} * can make HRegion to skip Mutations. * @param index * @param opStatus @@ -103,4 +105,25 @@ public class MiniBatchOperationInProgress { } return this.firstIndex + index; } + + /** + * Add more Mutations corresponding to the Mutation at the given index to be committed atomically + * in the same batch. These mutations are applied to the WAL and applied to the memstore as well. + * The timestamp of the cells in the given Mutations MUST be obtained from the original mutation. + * + * @param index the index that corresponds to the original mutation index in the batch + * @param newOperations the Mutations to add + */ + public void addOperationsFromCP(int index, Mutation[] newOperations) { + if (this.operationsFromCoprocessors == null) { + // lazy allocation to save on object allocation in case this is not used + this.operationsFromCoprocessors = new Mutation[operations.length][]; + } + this.operationsFromCoprocessors[getAbsoluteIndex(index)] = newOperations; + } + + public Mutation[] getOperationsFromCoprocessors(int index) { + return operationsFromCoprocessors == null ? null : + operationsFromCoprocessors[getAbsoluteIndex(index)]; + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiRowMutationProcessor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiRowMutationProcessor.java index 1947a1b4a08..995ea933f55 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiRowMutationProcessor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiRowMutationProcessor.java @@ -60,7 +60,7 @@ MultiRowMutationProcessorResponse> { public boolean readOnly() { return false; } - + @Override public MultiRowMutationProcessorResponse getResult() { return MultiRowMutationProcessorResponse.getDefaultInstance(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverForAddingMutationsFromCoprocessors.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverForAddingMutationsFromCoprocessors.java new file mode 100644 index 00000000000..0663b0b1248 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverForAddingMutationsFromCoprocessors.java @@ -0,0 +1,282 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.coprocessor; + +import static org.junit.Assert.*; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress; +import org.apache.hadoop.hbase.regionserver.wal.WALEdit; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.wal.WALKey; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; + +import com.google.common.collect.Lists; + +@Category(MediumTests.class) +public class TestRegionObserverForAddingMutationsFromCoprocessors { + + private static final Log LOG + = LogFactory.getLog(TestRegionObserverForAddingMutationsFromCoprocessors.class); + + private static HBaseTestingUtility util; + private static final byte[] dummy = Bytes.toBytes("dummy"); + private static final byte[] row1 = Bytes.toBytes("r1"); + private static final byte[] row2 = Bytes.toBytes("r2"); + private static final byte[] row3 = Bytes.toBytes("r3"); + private static final byte[] test = Bytes.toBytes("test"); + + @Rule + public TestName name = new TestName(); + private TableName tableName; + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + Configuration conf = HBaseConfiguration.create(); + conf.set(CoprocessorHost.WAL_COPROCESSOR_CONF_KEY, TestWALObserver.class.getName()); + util = new HBaseTestingUtility(conf); + util.startMiniCluster(); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + util.shutdownMiniCluster(); + } + + @Before + public void setUp() throws Exception { + tableName = TableName.valueOf(name.getMethodName()); + } + + private void createTable(String coprocessor) throws IOException { + HTableDescriptor htd = new HTableDescriptor(tableName) + .addFamily(new HColumnDescriptor(dummy)) + .addFamily(new HColumnDescriptor(test)) + .addCoprocessor(coprocessor); + util.getHBaseAdmin().createTable(htd); + } + + /** + * Test various multiput operations. + * @throws Exception + */ + @Test + public void testMulti() throws Exception { + createTable(TestMultiMutationCoprocessor.class.getName()); + + try (Table t = util.getConnection().getTable(tableName)) { + t.put(new Put(row1).addColumn(test, dummy, dummy)); + assertRowCount(t, 3); + } + } + + /** + * Tests that added mutations from coprocessors end up in the WAL. + */ + @Test + public void testCPMutationsAreWrittenToWALEdit() throws Exception { + createTable(TestMultiMutationCoprocessor.class.getName()); + + try (Table t = util.getConnection().getTable(tableName)) { + t.put(new Put(row1).addColumn(test, dummy, dummy)); + assertRowCount(t, 3); + } + + assertNotNull(TestWALObserver.savedEdit); + assertEquals(4, TestWALObserver.savedEdit.getCells().size()); + } + + private static void assertRowCount(Table t, int expected) throws IOException { + try (ResultScanner scanner = t.getScanner(new Scan())) { + int i = 0; + for (Result r: scanner) { + LOG.info(r.toString()); + i++; + } + assertEquals(expected, i); + } + } + + @Test + public void testDeleteCell() throws Exception { + createTable(TestDeleteCellCoprocessor.class.getName()); + + try (Table t = util.getConnection().getTable(tableName)) { + t.put(Lists.newArrayList( + new Put(row1).addColumn(test, dummy, dummy), + new Put(row2).addColumn(test, dummy, dummy), + new Put(row3).addColumn(test, dummy, dummy) + )); + + assertRowCount(t, 3); + + t.delete(new Delete(test).addColumn(test, dummy)); // delete non-existing row + assertRowCount(t, 1); + } + } + + @Test + public void testDeleteFamily() throws Exception { + createTable(TestDeleteFamilyCoprocessor.class.getName()); + + try (Table t = util.getConnection().getTable(tableName)) { + t.put(Lists.newArrayList( + new Put(row1).addColumn(test, dummy, dummy), + new Put(row2).addColumn(test, dummy, dummy), + new Put(row3).addColumn(test, dummy, dummy) + )); + + assertRowCount(t, 3); + + t.delete(new Delete(test).addFamily(test)); // delete non-existing row + assertRowCount(t, 1); + } + } + + @Test + public void testDeleteRow() throws Exception { + createTable(TestDeleteRowCoprocessor.class.getName()); + + try (Table t = util.getConnection().getTable(tableName)) { + t.put(Lists.newArrayList( + new Put(row1).addColumn(test, dummy, dummy), + new Put(row2).addColumn(test, dummy, dummy), + new Put(row3).addColumn(test, dummy, dummy) + )); + + assertRowCount(t, 3); + + t.delete(new Delete(test).addColumn(test, dummy)); // delete non-existing row + assertRowCount(t, 1); + } + } + + public static class TestMultiMutationCoprocessor extends BaseRegionObserver { + @Override + public void preBatchMutate(ObserverContext c, + MiniBatchOperationInProgress miniBatchOp) throws IOException { + Mutation mut = miniBatchOp.getOperation(0); + List cells = mut.getFamilyCellMap().get(test); + Put[] puts = new Put[] { + new Put(row1).addColumn(test, dummy, cells.get(0).getTimestamp(), + Bytes.toBytes("cpdummy")), + new Put(row2).addColumn(test, dummy, cells.get(0).getTimestamp(), dummy), + new Put(row3).addColumn(test, dummy, cells.get(0).getTimestamp(), dummy), + }; + LOG.info("Putting:" + puts); + miniBatchOp.addOperationsFromCP(0, puts); + } + } + + public static class TestDeleteCellCoprocessor extends BaseRegionObserver { + @Override + public void preBatchMutate(ObserverContext c, + MiniBatchOperationInProgress miniBatchOp) throws IOException { + Mutation mut = miniBatchOp.getOperation(0); + + if (mut instanceof Delete) { + List cells = mut.getFamilyCellMap().get(test); + Delete[] deletes = new Delete[] { + // delete only 2 rows + new Delete(row1).addColumns(test, dummy, cells.get(0).getTimestamp()), + new Delete(row2).addColumns(test, dummy, cells.get(0).getTimestamp()), + }; + LOG.info("Deleting:" + Arrays.toString(deletes)); + miniBatchOp.addOperationsFromCP(0, deletes); + } + } + } + + public static class TestDeleteFamilyCoprocessor extends BaseRegionObserver { + @Override + public void preBatchMutate(ObserverContext c, + MiniBatchOperationInProgress miniBatchOp) throws IOException { + Mutation mut = miniBatchOp.getOperation(0); + + if (mut instanceof Delete) { + List cells = mut.getFamilyCellMap().get(test); + Delete[] deletes = new Delete[] { + // delete only 2 rows + new Delete(row1).addFamily(test, cells.get(0).getTimestamp()), + new Delete(row2).addFamily(test, cells.get(0).getTimestamp()), + }; + LOG.info("Deleting:" + Arrays.toString(deletes)); + miniBatchOp.addOperationsFromCP(0, deletes); + } + } + } + + public static class TestDeleteRowCoprocessor extends BaseRegionObserver { + @Override + public void preBatchMutate(ObserverContext c, + MiniBatchOperationInProgress miniBatchOp) throws IOException { + Mutation mut = miniBatchOp.getOperation(0); + + if (mut instanceof Delete) { + List cells = mut.getFamilyCellMap().get(test); + Delete[] deletes = new Delete[] { + // delete only 2 rows + new Delete(row1, cells.get(0).getTimestamp()), + new Delete(row2, cells.get(0).getTimestamp()), + }; + LOG.info("Deleting:" + Arrays.toString(deletes)); + miniBatchOp.addOperationsFromCP(0, deletes); + } + } + } + + public static class TestWALObserver extends BaseWALObserver { + static WALEdit savedEdit = null; + @Override + public void postWALWrite(ObserverContext ctx, + HRegionInfo info, WALKey logKey, WALEdit logEdit) throws IOException { + if (info.getTable().equals(TableName.valueOf("testCPMutationsAreWrittenToWALEdit"))) { + savedEdit = logEdit; + } + super.postWALWrite(ctx, info, logKey, logEdit); + } + } +}