HBASE-15600 Add provision for adding mutations to memstore or able to write to same region in batchMutate coprocessor hooks (Rajeshbabu and Enis)

This commit is contained in:
Enis Soztutar 2016-06-07 20:12:51 -07:00
parent 3a95552cfe
commit d05a3722c8
4 changed files with 370 additions and 19 deletions

View File

@ -3134,6 +3134,35 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
batchOp.retCodeDetails, batchOp.walEditsFromCoprocessors, firstIndex, lastIndexExclusive); batchOp.retCodeDetails, batchOp.walEditsFromCoprocessors, firstIndex, lastIndexExclusive);
if (coprocessorHost.preBatchMutate(miniBatchOp)) { if (coprocessorHost.preBatchMutate(miniBatchOp)) {
return 0L; return 0L;
} else {
for (int i = firstIndex; i < lastIndexExclusive; i++) {
if (batchOp.retCodeDetails[i].getOperationStatusCode() != OperationStatusCode.NOT_RUN) {
// lastIndexExclusive was incremented above.
continue;
}
// we pass (i - firstIndex) below since the call expects a relative index
Mutation[] cpMutations = miniBatchOp.getOperationsFromCoprocessors(i - firstIndex);
if (cpMutations == null) {
continue;
}
// Else Coprocessor added more Mutations corresponding to the Mutation at this index.
for (int j = 0; j < cpMutations.length; j++) {
Mutation cpMutation = cpMutations[j];
Map<byte[], List<Cell>> cpFamilyMap = cpMutation.getFamilyCellMap();
checkAndPrepareMutation(cpMutation, replay, cpFamilyMap, now);
// Acquire row locks. If not, the whole batch will fail.
acquiredRowLocks.add(getRowLockInternal(cpMutation.getRow(), true));
if (cpMutation.getDurability() == Durability.SKIP_WAL) {
recordMutationWithoutWal(cpFamilyMap);
}
// Returned mutations from coprocessor correspond to the Mutation at index i. We can
// directly add the cells from those mutations to the familyMaps of this mutation.
mergeFamilyMaps(familyMaps[i], cpFamilyMap); // will get added to the memstore later
}
}
} }
} }
@ -3311,8 +3340,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
// after the put is done // after the put is done
MiniBatchOperationInProgress<Mutation> miniBatchOp = MiniBatchOperationInProgress<Mutation> miniBatchOp =
new MiniBatchOperationInProgress<Mutation>(batchOp.getMutationsForCoprocs(), new MiniBatchOperationInProgress<Mutation>(batchOp.getMutationsForCoprocs(),
batchOp.retCodeDetails, batchOp.walEditsFromCoprocessors, firstIndex, batchOp.retCodeDetails, batchOp.walEditsFromCoprocessors, firstIndex, lastIndexExclusive);
lastIndexExclusive);
coprocessorHost.postBatchMutateIndispensably(miniBatchOp, success); coprocessorHost.postBatchMutateIndispensably(miniBatchOp, success);
} }
@ -3320,6 +3348,18 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
} }
} }
private void mergeFamilyMaps(Map<byte[], List<Cell>> familyMap,
Map<byte[], List<Cell>> toBeMerged) {
for (Map.Entry<byte[], List<Cell>> entry : toBeMerged.entrySet()) {
List<Cell> cells = familyMap.get(entry.getKey());
if (cells == null) {
familyMap.put(entry.getKey(), entry.getValue());
} else {
cells.addAll(entry.getValue());
}
}
}
private void appendCurrentNonces(final Mutation mutation, final boolean replay, private void appendCurrentNonces(final Mutation mutation, final boolean replay,
final WALEdit walEdit, final long now, final long currentNonceGroup, final long currentNonce) final WALEdit walEdit, final long now, final long currentNonceGroup, final long currentNonce)
throws IOException { throws IOException {
@ -3348,18 +3388,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
familyMaps[lastIndexExclusive] = familyMap; familyMaps[lastIndexExclusive] = familyMap;
try { try {
if (mutation instanceof Put) { checkAndPrepareMutation(mutation, batchOp.isInReplay(), familyMap, now);
// Check the families in the put. If bad, skip this one.
if (batchOp.isInReplay()) {
removeNonExistentColumnFamilyForReplay(familyMap);
} else {
checkFamilies(familyMap.keySet());
}
checkTimestamps(mutation.getFamilyCellMap(), now);
} else {
prepareDelete((Delete)mutation);
}
checkRow(mutation.getRow(), "doMiniBatchMutation");
} catch (NoSuchColumnFamilyException nscf) { } catch (NoSuchColumnFamilyException nscf) {
LOG.warn("No such column family in batch mutation", nscf); LOG.warn("No such column family in batch mutation", nscf);
batchOp.retCodeDetails[lastIndexExclusive] = new OperationStatus( batchOp.retCodeDetails[lastIndexExclusive] = new OperationStatus(
@ -3379,6 +3408,23 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
return skip; return skip;
} }
private void checkAndPrepareMutation(Mutation mutation, boolean replay,
final Map<byte[], List<Cell>> familyMap, final long now)
throws IOException {
if (mutation instanceof Put) {
// Check the families in the put. If bad, skip this one.
if (replay) {
removeNonExistentColumnFamilyForReplay(familyMap);
} else {
checkFamilies(familyMap.keySet());
}
checkTimestamps(mutation.getFamilyCellMap(), now);
} else {
prepareDelete((Delete)mutation);
}
checkRow(mutation.getRow(), "doMiniBatchMutation");
}
/** /**
* During replay, there could exist column families which are removed between region server * During replay, there could exist column families which are removed between region server
* failure and replay * failure and replay

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.hbase.regionserver; package org.apache.hadoop.hbase.regionserver;
import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
/** /**
@ -29,9 +30,10 @@ import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
* ObserverContext, MiniBatchOperationInProgress) * ObserverContext, MiniBatchOperationInProgress)
* @param T Pair&lt;Mutation, Integer&gt; pair of Mutations and associated rowlock ids . * @param T Pair&lt;Mutation, Integer&gt; pair of Mutations and associated rowlock ids .
*/ */
@InterfaceAudience.Private @InterfaceAudience.LimitedPrivate("Coprocessors")
public class MiniBatchOperationInProgress<T> { public class MiniBatchOperationInProgress<T> {
private final T[] operations; private final T[] operations;
private Mutation[][] operationsFromCoprocessors;
private final OperationStatus[] retCodeDetails; private final OperationStatus[] retCodeDetails;
private final WALEdit[] walEditsFromCoprocessors; private final WALEdit[] walEditsFromCoprocessors;
private final int firstIndex; private final int firstIndex;
@ -103,4 +105,25 @@ public class MiniBatchOperationInProgress<T> {
} }
return this.firstIndex + index; return this.firstIndex + index;
} }
/**
* Add more Mutations corresponding to the Mutation at the given index to be committed atomically
* in the same batch. These mutations are applied to the WAL and applied to the memstore as well.
* The timestamp of the cells in the given Mutations MUST be obtained from the original mutation.
*
* @param index the index that corresponds to the original mutation index in the batch
* @param newOperations the Mutations to add
*/
public void addOperationsFromCP(int index, Mutation[] newOperations) {
if (this.operationsFromCoprocessors == null) {
// lazy allocation to save on object allocation in case this is not used
this.operationsFromCoprocessors = new Mutation[operations.length][];
}
this.operationsFromCoprocessors[getAbsoluteIndex(index)] = newOperations;
}
public Mutation[] getOperationsFromCoprocessors(int index) {
return operationsFromCoprocessors == null ? null :
operationsFromCoprocessors[getAbsoluteIndex(index)];
}
} }

View File

@ -0,0 +1,282 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.coprocessor;
import static org.junit.Assert.*;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.wal.WALKey;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
import com.google.common.collect.Lists;
@Category(MediumTests.class)
public class TestRegionObserverForAddingMutationsFromCoprocessors {
private static final Log LOG
= LogFactory.getLog(TestRegionObserverForAddingMutationsFromCoprocessors.class);
private static HBaseTestingUtility util;
private static final byte[] dummy = Bytes.toBytes("dummy");
private static final byte[] row1 = Bytes.toBytes("r1");
private static final byte[] row2 = Bytes.toBytes("r2");
private static final byte[] row3 = Bytes.toBytes("r3");
private static final byte[] test = Bytes.toBytes("test");
@Rule
public TestName name = new TestName();
private TableName tableName;
@BeforeClass
public static void setUpBeforeClass() throws Exception {
Configuration conf = HBaseConfiguration.create();
conf.set(CoprocessorHost.WAL_COPROCESSOR_CONF_KEY, TestWALObserver.class.getName());
util = new HBaseTestingUtility(conf);
util.startMiniCluster();
}
@AfterClass
public static void tearDownAfterClass() throws Exception {
util.shutdownMiniCluster();
}
@Before
public void setUp() throws Exception {
tableName = TableName.valueOf(name.getMethodName());
}
private void createTable(String coprocessor) throws IOException {
HTableDescriptor htd = new HTableDescriptor(tableName)
.addFamily(new HColumnDescriptor(dummy))
.addFamily(new HColumnDescriptor(test))
.addCoprocessor(coprocessor);
util.getAdmin().createTable(htd);
}
/**
* Test various multiput operations.
* @throws Exception
*/
@Test
public void testMulti() throws Exception {
createTable(TestMultiMutationCoprocessor.class.getName());
try (Table t = util.getConnection().getTable(tableName)) {
t.put(new Put(row1).addColumn(test, dummy, dummy));
assertRowCount(t, 3);
}
}
/**
* Tests that added mutations from coprocessors end up in the WAL.
*/
@Test
public void testCPMutationsAreWrittenToWALEdit() throws Exception {
createTable(TestMultiMutationCoprocessor.class.getName());
try (Table t = util.getConnection().getTable(tableName)) {
t.put(new Put(row1).addColumn(test, dummy, dummy));
assertRowCount(t, 3);
}
assertNotNull(TestWALObserver.savedEdit);
assertEquals(4, TestWALObserver.savedEdit.getCells().size());
}
private static void assertRowCount(Table t, int expected) throws IOException {
try (ResultScanner scanner = t.getScanner(new Scan())) {
int i = 0;
for (Result r: scanner) {
LOG.info(r.toString());
i++;
}
assertEquals(expected, i);
}
}
@Test
public void testDeleteCell() throws Exception {
createTable(TestDeleteCellCoprocessor.class.getName());
try (Table t = util.getConnection().getTable(tableName)) {
t.put(Lists.newArrayList(
new Put(row1).addColumn(test, dummy, dummy),
new Put(row2).addColumn(test, dummy, dummy),
new Put(row3).addColumn(test, dummy, dummy)
));
assertRowCount(t, 3);
t.delete(new Delete(test).addColumn(test, dummy)); // delete non-existing row
assertRowCount(t, 1);
}
}
@Test
public void testDeleteFamily() throws Exception {
createTable(TestDeleteFamilyCoprocessor.class.getName());
try (Table t = util.getConnection().getTable(tableName)) {
t.put(Lists.newArrayList(
new Put(row1).addColumn(test, dummy, dummy),
new Put(row2).addColumn(test, dummy, dummy),
new Put(row3).addColumn(test, dummy, dummy)
));
assertRowCount(t, 3);
t.delete(new Delete(test).addFamily(test)); // delete non-existing row
assertRowCount(t, 1);
}
}
@Test
public void testDeleteRow() throws Exception {
createTable(TestDeleteRowCoprocessor.class.getName());
try (Table t = util.getConnection().getTable(tableName)) {
t.put(Lists.newArrayList(
new Put(row1).addColumn(test, dummy, dummy),
new Put(row2).addColumn(test, dummy, dummy),
new Put(row3).addColumn(test, dummy, dummy)
));
assertRowCount(t, 3);
t.delete(new Delete(test).addColumn(test, dummy)); // delete non-existing row
assertRowCount(t, 1);
}
}
public static class TestMultiMutationCoprocessor extends BaseRegionObserver {
@Override
public void preBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c,
MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
Mutation mut = miniBatchOp.getOperation(0);
List<Cell> cells = mut.getFamilyCellMap().get(test);
Put[] puts = new Put[] {
new Put(row1).addColumn(test, dummy, cells.get(0).getTimestamp(),
Bytes.toBytes("cpdummy")),
new Put(row2).addColumn(test, dummy, cells.get(0).getTimestamp(), dummy),
new Put(row3).addColumn(test, dummy, cells.get(0).getTimestamp(), dummy),
};
LOG.info("Putting:" + puts);
miniBatchOp.addOperationsFromCP(0, puts);
}
}
public static class TestDeleteCellCoprocessor extends BaseRegionObserver {
@Override
public void preBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c,
MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
Mutation mut = miniBatchOp.getOperation(0);
if (mut instanceof Delete) {
List<Cell> cells = mut.getFamilyCellMap().get(test);
Delete[] deletes = new Delete[] {
// delete only 2 rows
new Delete(row1).addColumns(test, dummy, cells.get(0).getTimestamp()),
new Delete(row2).addColumns(test, dummy, cells.get(0).getTimestamp()),
};
LOG.info("Deleting:" + Arrays.toString(deletes));
miniBatchOp.addOperationsFromCP(0, deletes);
}
}
}
public static class TestDeleteFamilyCoprocessor extends BaseRegionObserver {
@Override
public void preBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c,
MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
Mutation mut = miniBatchOp.getOperation(0);
if (mut instanceof Delete) {
List<Cell> cells = mut.getFamilyCellMap().get(test);
Delete[] deletes = new Delete[] {
// delete only 2 rows
new Delete(row1).addFamily(test, cells.get(0).getTimestamp()),
new Delete(row2).addFamily(test, cells.get(0).getTimestamp()),
};
LOG.info("Deleting:" + Arrays.toString(deletes));
miniBatchOp.addOperationsFromCP(0, deletes);
}
}
}
public static class TestDeleteRowCoprocessor extends BaseRegionObserver {
@Override
public void preBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c,
MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
Mutation mut = miniBatchOp.getOperation(0);
if (mut instanceof Delete) {
List<Cell> cells = mut.getFamilyCellMap().get(test);
Delete[] deletes = new Delete[] {
// delete only 2 rows
new Delete(row1, cells.get(0).getTimestamp()),
new Delete(row2, cells.get(0).getTimestamp()),
};
LOG.info("Deleting:" + Arrays.toString(deletes));
miniBatchOp.addOperationsFromCP(0, deletes);
}
}
}
public static class TestWALObserver extends BaseWALObserver {
static WALEdit savedEdit = null;
@Override
public void postWALWrite(ObserverContext<? extends WALCoprocessorEnvironment> ctx,
HRegionInfo info, WALKey logKey, WALEdit logEdit) throws IOException {
if (info.getTable().equals(TableName.valueOf("testCPMutationsAreWrittenToWALEdit"))) {
savedEdit = logEdit;
}
super.postWALWrite(ctx, info, logKey, logEdit);
}
}
}