HBASE-21643 Introduce two new region coprocessor method and deprecated postMutationBeforeWAL

This commit is contained in:
Guanghao Zhang 2018-12-26 17:42:02 +08:00
parent c2d5991b82
commit f5ea00f724
5 changed files with 134 additions and 28 deletions

View File

@ -20,6 +20,7 @@
package org.apache.hadoop.hbase.coprocessor; package org.apache.hadoop.hbase.coprocessor;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -1029,12 +1030,58 @@ public interface RegionObserver {
* @param oldCell old cell containing previous value * @param oldCell old cell containing previous value
* @param newCell the new cell containing the computed value * @param newCell the new cell containing the computed value
* @return the new cell, possibly changed * @return the new cell, possibly changed
* @deprecated Use {@link #postIncrementBeforeWAL} or {@link #postAppendBeforeWAL} instead.
*/ */
@Deprecated
default Cell postMutationBeforeWAL(ObserverContext<RegionCoprocessorEnvironment> ctx, default Cell postMutationBeforeWAL(ObserverContext<RegionCoprocessorEnvironment> ctx,
MutationType opType, Mutation mutation, Cell oldCell, Cell newCell) throws IOException { MutationType opType, Mutation mutation, Cell oldCell, Cell newCell) throws IOException {
return newCell; return newCell;
} }
/**
* Called after a list of new cells has been created during an increment operation, but before
* they are committed to the WAL or memstore.
*
* @param ctx the environment provided by the region server
* @param mutation the current mutation
* @param cellPairs a list of cell pair. The first cell is old cell which may be null.
* And the second cell is the new cell.
* @return a list of cell pair, possibly changed.
*/
default List<Pair<Cell, Cell>> postIncrementBeforeWAL(
ObserverContext<RegionCoprocessorEnvironment> ctx, Mutation mutation,
List<Pair<Cell, Cell>> cellPairs) throws IOException {
List<Pair<Cell, Cell>> resultPairs = new ArrayList<>(cellPairs.size());
for (Pair<Cell, Cell> pair : cellPairs) {
resultPairs.add(new Pair<>(pair.getFirst(),
postMutationBeforeWAL(ctx, MutationType.INCREMENT, mutation, pair.getFirst(),
pair.getSecond())));
}
return resultPairs;
}
/**
* Called after a list of new cells has been created during an append operation, but before
* they are committed to the WAL or memstore.
*
* @param ctx the environment provided by the region server
* @param mutation the current mutation
* @param cellPairs a list of cell pair. The first cell is old cell which may be null.
* And the second cell is the new cell.
* @return a list of cell pair, possibly changed.
*/
default List<Pair<Cell, Cell>> postAppendBeforeWAL(
ObserverContext<RegionCoprocessorEnvironment> ctx, Mutation mutation,
List<Pair<Cell, Cell>> cellPairs) throws IOException {
List<Pair<Cell, Cell>> resultPairs = new ArrayList<>(cellPairs.size());
for (Pair<Cell, Cell> pair : cellPairs) {
resultPairs.add(new Pair<>(pair.getFirst(),
postMutationBeforeWAL(ctx, MutationType.INCREMENT, mutation, pair.getFirst(),
pair.getSecond())));
}
return resultPairs;
}
/** /**
* Called after the ScanQueryMatcher creates ScanDeleteTracker. Implementing * Called after the ScanQueryMatcher creates ScanDeleteTracker. Implementing
* this hook would help in creating customised DeleteTracker and returning * this hook would help in creating customised DeleteTracker and returning

View File

@ -70,6 +70,8 @@ import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Function; import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
@ -120,7 +122,6 @@ import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.conf.ConfigurationManager; import org.apache.hadoop.hbase.conf.ConfigurationManager;
import org.apache.hadoop.hbase.conf.PropagatingConfigurationObserver; import org.apache.hadoop.hbase.conf.PropagatingConfigurationObserver;
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
import org.apache.hadoop.hbase.coprocessor.RegionObserver.MutationType;
import org.apache.hadoop.hbase.errorhandling.ForeignExceptionSnare; import org.apache.hadoop.hbase.errorhandling.ForeignExceptionSnare;
import org.apache.hadoop.hbase.exceptions.FailedSanityCheckException; import org.apache.hadoop.hbase.exceptions.FailedSanityCheckException;
import org.apache.hadoop.hbase.exceptions.TimeoutIOException; import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
@ -8014,7 +8015,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
Durability effectiveDurability, long now, List<Cell> deltas, List<Cell> results) Durability effectiveDurability, long now, List<Cell> deltas, List<Cell> results)
throws IOException { throws IOException {
byte[] columnFamily = store.getColumnFamilyDescriptor().getName(); byte[] columnFamily = store.getColumnFamilyDescriptor().getName();
List<Cell> toApply = new ArrayList<>(deltas.size()); List<Pair<Cell, Cell>> cellPairs = new ArrayList<>(deltas.size());
// Get previous values for all columns in this family. // Get previous values for all columns in this family.
TimeRange tr = null; TimeRange tr = null;
switch (op) { switch (op) {
@ -8041,18 +8042,16 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
currentValuesIndex++; currentValuesIndex++;
} }
} }
// Switch on whether this an increment or an append building the new Cell to apply. // Switch on whether this an increment or an append building the new Cell to apply.
Cell newCell = null; Cell newCell = null;
MutationType mutationType = null;
switch (op) { switch (op) {
case INCREMENT: case INCREMENT:
mutationType = MutationType.INCREMENT;
long deltaAmount = getLongValue(delta); long deltaAmount = getLongValue(delta);
final long newValue = currentValue == null ? deltaAmount : getLongValue(currentValue) + deltaAmount; final long newValue = currentValue == null ? deltaAmount : getLongValue(currentValue) + deltaAmount;
newCell = reckonDelta(delta, currentValue, columnFamily, now, mutation, (oldCell) -> Bytes.toBytes(newValue)); newCell = reckonDelta(delta, currentValue, columnFamily, now, mutation, (oldCell) -> Bytes.toBytes(newValue));
break; break;
case APPEND: case APPEND:
mutationType = MutationType.APPEND;
newCell = reckonDelta(delta, currentValue, columnFamily, now, mutation, (oldCell) -> newCell = reckonDelta(delta, currentValue, columnFamily, now, mutation, (oldCell) ->
ByteBuffer.wrap(new byte[delta.getValueLength() + oldCell.getValueLength()]) ByteBuffer.wrap(new byte[delta.getValueLength() + oldCell.getValueLength()])
.put(oldCell.getValueArray(), oldCell.getValueOffset(), oldCell.getValueLength()) .put(oldCell.getValueArray(), oldCell.getValueOffset(), oldCell.getValueLength())
@ -8063,18 +8062,21 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
default: throw new UnsupportedOperationException(op.toString()); default: throw new UnsupportedOperationException(op.toString());
} }
// Give coprocessors a chance to update the new cell cellPairs.add(new Pair<>(currentValue, newCell));
if (coprocessorHost != null) {
newCell =
coprocessorHost.postMutationBeforeWAL(mutationType, mutation, currentValue, newCell);
}
toApply.add(newCell);
// Add to results to get returned to the Client. If null, cilent does not want results. // Add to results to get returned to the Client. If null, cilent does not want results.
if (results != null) { if (results != null) {
results.add(newCell); results.add(newCell);
} }
} }
return toApply;
// Give coprocessors a chance to update the new cells before apply to WAL or memstore
if (coprocessorHost != null) {
// Here the operation must be increment or append.
cellPairs = op == Operation.INCREMENT ?
coprocessorHost.postIncrementBeforeWAL(mutation, cellPairs) :
coprocessorHost.postAppendBeforeWAL(mutation, cellPairs);
}
return cellPairs.stream().map(Pair::getSecond).collect(Collectors.toList());
} }
private static Cell reckonDelta(final Cell delta, final Cell currentCell, private static Cell reckonDelta(final Cell delta, final Cell currentCell,

View File

@ -67,7 +67,6 @@ import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.coprocessor.RegionObserver; import org.apache.hadoop.hbase.coprocessor.RegionObserver;
import org.apache.hadoop.hbase.coprocessor.RegionObserver.MutationType;
import org.apache.hadoop.hbase.filter.ByteArrayComparable; import org.apache.hadoop.hbase.filter.ByteArrayComparable;
import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper; import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
import org.apache.hadoop.hbase.io.Reference; import org.apache.hadoop.hbase.io.Reference;
@ -1691,16 +1690,32 @@ public class RegionCoprocessorHost
}); });
} }
public Cell postMutationBeforeWAL(final MutationType opType, final Mutation mutation, public List<Pair<Cell, Cell>> postIncrementBeforeWAL(final Mutation mutation,
final Cell oldCell, Cell newCell) throws IOException { final List<Pair<Cell, Cell>> cellPairs) throws IOException {
if (this.coprocEnvironments.isEmpty()) { if (this.coprocEnvironments.isEmpty()) {
return newCell; return cellPairs;
} }
return execOperationWithResult( return execOperationWithResult(
new ObserverOperationWithResult<RegionObserver, Cell>(regionObserverGetter, newCell) { new ObserverOperationWithResult<RegionObserver, List<Pair<Cell, Cell>>>(
regionObserverGetter, cellPairs) {
@Override @Override
public Cell call(RegionObserver observer) throws IOException { public List<Pair<Cell, Cell>> call(RegionObserver observer) throws IOException {
return observer.postMutationBeforeWAL(this, opType, mutation, oldCell, getResult()); return observer.postIncrementBeforeWAL(this, mutation, getResult());
}
});
}
public List<Pair<Cell, Cell>> postAppendBeforeWAL(final Mutation mutation,
final List<Pair<Cell, Cell>> cellPairs) throws IOException {
if (this.coprocEnvironments.isEmpty()) {
return cellPairs;
}
return execOperationWithResult(
new ObserverOperationWithResult<RegionObserver, List<Pair<Cell, Cell>>>(
regionObserverGetter, cellPairs) {
@Override
public List<Pair<Cell, Cell>> call(RegionObserver observer) throws IOException {
return observer.postAppendBeforeWAL(this, mutation, getResult());
} }
}); });
} }

View File

@ -36,6 +36,7 @@ import java.util.Optional;
import java.util.Set; import java.util.Set;
import java.util.TreeMap; import java.util.TreeMap;
import java.util.TreeSet; import java.util.TreeSet;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.ArrayBackedTag; import org.apache.hadoop.hbase.ArrayBackedTag;
@ -1849,14 +1850,34 @@ public class AccessController implements MasterCoprocessor, RegionCoprocessor,
} }
@Override @Override
public Cell postMutationBeforeWAL(ObserverContext<RegionCoprocessorEnvironment> ctx, public List<Pair<Cell, Cell>> postIncrementBeforeWAL(
MutationType opType, Mutation mutation, Cell oldCell, Cell newCell) throws IOException { ObserverContext<RegionCoprocessorEnvironment> ctx, Mutation mutation,
List<Pair<Cell, Cell>> cellPairs) throws IOException {
// If the HFile version is insufficient to persist tags, we won't have any // If the HFile version is insufficient to persist tags, we won't have any
// work to do here // work to do here
if (!cellFeaturesEnabled) { if (!cellFeaturesEnabled) {
return newCell; return cellPairs;
} }
return cellPairs.stream().map(pair -> new Pair<>(pair.getFirst(),
createNewCellWithTags(mutation, pair.getFirst(), pair.getSecond())))
.collect(Collectors.toList());
}
@Override
public List<Pair<Cell, Cell>> postAppendBeforeWAL(
ObserverContext<RegionCoprocessorEnvironment> ctx, Mutation mutation,
List<Pair<Cell, Cell>> cellPairs) throws IOException {
// If the HFile version is insufficient to persist tags, we won't have any
// work to do here
if (!cellFeaturesEnabled) {
return cellPairs;
}
return cellPairs.stream().map(pair -> new Pair<>(pair.getFirst(),
createNewCellWithTags(mutation, pair.getFirst(), pair.getSecond())))
.collect(Collectors.toList());
}
private Cell createNewCellWithTags(Mutation mutation, Cell oldCell, Cell newCell) {
// Collect any ACLs from the old cell // Collect any ACLs from the old cell
List<Tag> tags = Lists.newArrayList(); List<Tag> tags = Lists.newArrayList();
List<Tag> aclTags = Lists.newArrayList(); List<Tag> aclTags = Lists.newArrayList();
@ -1901,8 +1922,7 @@ public class AccessController implements MasterCoprocessor, RegionCoprocessor,
return newCell; return newCell;
} }
Cell rewriteCell = PrivateCellUtil.createCell(newCell, tags); return PrivateCellUtil.createCell(newCell, tags);
return rewriteCell;
} }
@Override @Override

View File

@ -127,6 +127,7 @@ import org.slf4j.LoggerFactory;
public class VisibilityController implements MasterCoprocessor, RegionCoprocessor, public class VisibilityController implements MasterCoprocessor, RegionCoprocessor,
VisibilityLabelsService.Interface, MasterObserver, RegionObserver { VisibilityLabelsService.Interface, MasterObserver, RegionObserver {
private static final Logger LOG = LoggerFactory.getLogger(VisibilityController.class); private static final Logger LOG = LoggerFactory.getLogger(VisibilityController.class);
private static final Logger AUDITLOG = LoggerFactory.getLogger("SecurityLogger." private static final Logger AUDITLOG = LoggerFactory.getLogger("SecurityLogger."
+ VisibilityController.class.getName()); + VisibilityController.class.getName());
@ -688,8 +689,30 @@ public class VisibilityController implements MasterCoprocessor, RegionCoprocesso
} }
@Override @Override
public Cell postMutationBeforeWAL(ObserverContext<RegionCoprocessorEnvironment> ctx, public List<Pair<Cell, Cell>> postIncrementBeforeWAL(
MutationType opType, Mutation mutation, Cell oldCell, Cell newCell) throws IOException { ObserverContext<RegionCoprocessorEnvironment> ctx, Mutation mutation,
List<Pair<Cell, Cell>> cellPairs) throws IOException {
List<Pair<Cell, Cell>> resultPairs = new ArrayList<>(cellPairs.size());
for (Pair<Cell, Cell> pair : cellPairs) {
resultPairs
.add(new Pair<>(pair.getFirst(), createNewCellWithTags(mutation, pair.getSecond())));
}
return resultPairs;
}
@Override
public List<Pair<Cell, Cell>> postAppendBeforeWAL(
ObserverContext<RegionCoprocessorEnvironment> ctx, Mutation mutation,
List<Pair<Cell, Cell>> cellPairs) throws IOException {
List<Pair<Cell, Cell>> resultPairs = new ArrayList<>(cellPairs.size());
for (Pair<Cell, Cell> pair : cellPairs) {
resultPairs
.add(new Pair<>(pair.getFirst(), createNewCellWithTags(mutation, pair.getSecond())));
}
return resultPairs;
}
private Cell createNewCellWithTags(Mutation mutation, Cell newCell) throws IOException {
List<Tag> tags = Lists.newArrayList(); List<Tag> tags = Lists.newArrayList();
CellVisibility cellVisibility = null; CellVisibility cellVisibility = null;
try { try {
@ -715,8 +738,7 @@ public class VisibilityController implements MasterCoprocessor, RegionCoprocesso
} }
} }
Cell rewriteCell = PrivateCellUtil.createCell(newCell, tags); return PrivateCellUtil.createCell(newCell, tags);
return rewriteCell;
} }
@Override @Override