HBASE-18546 Always overwrite the TS for Append/Increment unless no existing cells are found

This commit is contained in:
Chia-Ping Tsai 2017-08-24 14:23:36 +08:00
parent 3b444a066c
commit 25ee5f7f84
8 changed files with 395 additions and 397 deletions

View File

@ -27,6 +27,7 @@ import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.NavigableSet;
import java.util.function.Function;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
@ -616,6 +617,62 @@ public final class ProtobufUtil {
return delete;
}
@FunctionalInterface
private interface ConsumerWithException <T, U> {
void accept(T t, U u) throws IOException;
}
private static <T extends Mutation> T toDelta(Function<Bytes, T> supplier, ConsumerWithException<T, Cell> consumer,
final MutationProto proto, final CellScanner cellScanner) throws IOException {
byte[] row = proto.hasRow() ? proto.getRow().toByteArray() : null;
T mutation = row == null ? null : supplier.apply(new Bytes(row));
int cellCount = proto.hasAssociatedCellCount() ? proto.getAssociatedCellCount() : 0;
if (cellCount > 0) {
// The proto has metadata only and the data is separate to be found in the cellScanner.
if (cellScanner == null) {
throw new DoNotRetryIOException("Cell count of " + cellCount + " but no cellScanner: " +
toShortString(proto));
}
for (int i = 0; i < cellCount; i++) {
if (!cellScanner.advance()) {
throw new DoNotRetryIOException("Cell count of " + cellCount + " but at index " + i +
" no cell returned: " + toShortString(proto));
}
Cell cell = cellScanner.current();
if (mutation == null) {
mutation = supplier.apply(new Bytes(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()));
}
consumer.accept(mutation, cell);
}
} else {
if (mutation == null) {
throw new IllegalArgumentException("row cannot be null");
}
for (ColumnValue column : proto.getColumnValueList()) {
byte[] family = column.getFamily().toByteArray();
for (QualifierValue qv : column.getQualifierValueList()) {
byte[] qualifier = qv.getQualifier().toByteArray();
if (!qv.hasValue()) {
throw new DoNotRetryIOException(
"Missing required field: qualifier value");
}
byte[] value = qv.getValue().toByteArray();
byte[] tags = null;
if (qv.hasTags()) {
tags = qv.getTags().toByteArray();
}
consumer.accept(mutation, CellUtil.createCell(mutation.getRow(), family, qualifier, qv.getTimestamp(),
KeyValue.Type.Put, value, tags));
}
}
}
mutation.setDurability(toDurability(proto.getDurability()));
for (NameBytesPair attribute : proto.getAttributeList()) {
mutation.setAttribute(attribute.getName(), attribute.getValue().toByteArray());
}
return mutation;
}
/**
* Convert a protocol buffer Mutate to an Append
* @param cellScanner
@ -624,54 +681,31 @@ public final class ProtobufUtil {
* @throws IOException
*/
public static Append toAppend(final MutationProto proto, final CellScanner cellScanner)
throws IOException {
throws IOException {
MutationType type = proto.getMutateType();
assert type == MutationType.APPEND : type.name();
byte [] row = proto.hasRow()? proto.getRow().toByteArray(): null;
Append append = null;
int cellCount = proto.hasAssociatedCellCount()? proto.getAssociatedCellCount(): 0;
if (cellCount > 0) {
// The proto has metadata only and the data is separate to be found in the cellScanner.
if (cellScanner == null) {
throw new DoNotRetryIOException("Cell count of " + cellCount + " but no cellScanner: " +
toShortString(proto));
}
for (int i = 0; i < cellCount; i++) {
if (!cellScanner.advance()) {
throw new DoNotRetryIOException("Cell count of " + cellCount + " but at index " + i +
" no cell returned: " + toShortString(proto));
}
Cell cell = cellScanner.current();
if (append == null) {
append = new Append(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength());
}
append.add(cell);
}
} else {
append = new Append(row);
for (ColumnValue column: proto.getColumnValueList()) {
byte[] family = column.getFamily().toByteArray();
for (QualifierValue qv: column.getQualifierValueList()) {
byte[] qualifier = qv.getQualifier().toByteArray();
if (!qv.hasValue()) {
throw new DoNotRetryIOException(
"Missing required field: qualifier value");
}
byte[] value = qv.getValue().toByteArray();
byte[] tags = null;
if (qv.hasTags()) {
tags = qv.getTags().toByteArray();
}
append.add(CellUtil.createCell(row, family, qualifier, qv.getTimestamp(),
KeyValue.Type.Put, value, tags));
}
}
return toDelta((Bytes row) -> new Append(row.get(), row.getOffset(), row.getLength()),
Append::add, proto, cellScanner);
}
/**
* Convert a protocol buffer Mutate to an Increment
*
* @param proto the protocol buffer Mutate to convert
* @return the converted client Increment
* @throws IOException
*/
public static Increment toIncrement(final MutationProto proto, final CellScanner cellScanner)
throws IOException {
MutationType type = proto.getMutateType();
assert type == MutationType.INCREMENT : type.name();
Increment increment = toDelta((Bytes row) -> new Increment(row.get(), row.getOffset(), row.getLength()),
Increment::add, proto, cellScanner);
if (proto.hasTimeRange()) {
TimeRange timeRange = protoToTimeRange(proto.getTimeRange());
increment.setTimeRange(timeRange.getMin(), timeRange.getMax());
}
append.setDurability(toDurability(proto.getDurability()));
for (NameBytesPair attribute: proto.getAttributeList()) {
append.setAttribute(attribute.getName(), attribute.getValue().toByteArray());
}
return append;
return increment;
}
/**
@ -695,67 +729,6 @@ public final class ProtobufUtil {
throw new IOException("Unknown mutation type " + type);
}
/**
* Convert a protocol buffer Mutate to an Increment
*
* @param proto the protocol buffer Mutate to convert
* @return the converted client Increment
* @throws IOException
*/
public static Increment toIncrement(final MutationProto proto, final CellScanner cellScanner)
throws IOException {
MutationType type = proto.getMutateType();
assert type == MutationType.INCREMENT : type.name();
byte [] row = proto.hasRow()? proto.getRow().toByteArray(): null;
Increment increment = null;
int cellCount = proto.hasAssociatedCellCount()? proto.getAssociatedCellCount(): 0;
if (cellCount > 0) {
// The proto has metadata only and the data is separate to be found in the cellScanner.
if (cellScanner == null) {
throw new DoNotRetryIOException("Cell count of " + cellCount + " but no cellScanner: " +
TextFormat.shortDebugString(proto));
}
for (int i = 0; i < cellCount; i++) {
if (!cellScanner.advance()) {
throw new DoNotRetryIOException("Cell count of " + cellCount + " but at index " + i +
" no cell returned: " + TextFormat.shortDebugString(proto));
}
Cell cell = cellScanner.current();
if (increment == null) {
increment = new Increment(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength());
}
increment.add(cell);
}
} else {
increment = new Increment(row);
for (ColumnValue column: proto.getColumnValueList()) {
byte[] family = column.getFamily().toByteArray();
for (QualifierValue qv: column.getQualifierValueList()) {
byte[] qualifier = qv.getQualifier().toByteArray();
if (!qv.hasValue()) {
throw new DoNotRetryIOException("Missing required field: qualifier value");
}
byte[] value = qv.getValue().toByteArray();
byte[] tags = null;
if (qv.hasTags()) {
tags = qv.getTags().toByteArray();
}
increment.add(CellUtil.createCell(row, family, qualifier, qv.getTimestamp(),
KeyValue.Type.Put, value, tags));
}
}
}
if (proto.hasTimeRange()) {
TimeRange timeRange = protoToTimeRange(proto.getTimeRange());
increment.setTimeRange(timeRange.getMin(), timeRange.getMax());
}
increment.setDurability(toDurability(proto.getDurability()));
for (NameBytesPair attribute : proto.getAttributeList()) {
increment.setAttribute(attribute.getName(), attribute.getValue().toByteArray());
}
return increment;
}
/**
* Convert a protocol buffer Mutate to a Get.
* @param proto the protocol buffer Mutate to convert.
@ -1137,56 +1110,6 @@ public final class ProtobufUtil {
}
}
/**
* Convert a client Increment to a protobuf Mutate.
*
* @param increment
* @return the converted mutate
*/
public static MutationProto toMutation(
final Increment increment, final MutationProto.Builder builder, long nonce) {
builder.setRow(ByteStringer.wrap(increment.getRow()));
builder.setMutateType(MutationType.INCREMENT);
builder.setDurability(toDurability(increment.getDurability()));
if (nonce != HConstants.NO_NONCE) {
builder.setNonce(nonce);
}
TimeRange timeRange = increment.getTimeRange();
setTimeRange(builder, timeRange);
ColumnValue.Builder columnBuilder = ColumnValue.newBuilder();
QualifierValue.Builder valueBuilder = QualifierValue.newBuilder();
for (Map.Entry<byte[], List<Cell>> family: increment.getFamilyCellMap().entrySet()) {
columnBuilder.setFamily(ByteStringer.wrap(family.getKey()));
columnBuilder.clearQualifierValue();
List<Cell> values = family.getValue();
if (values != null && values.size() > 0) {
for (Cell cell: values) {
valueBuilder.clear();
valueBuilder.setQualifier(ByteStringer.wrap(
cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength()));
valueBuilder.setValue(ByteStringer.wrap(
cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()));
if (cell.getTagsLength() > 0) {
valueBuilder.setTags(ByteStringer.wrap(cell.getTagsArray(),
cell.getTagsOffset(), cell.getTagsLength()));
}
columnBuilder.addQualifierValue(valueBuilder.build());
}
}
builder.addColumnValue(columnBuilder.build());
}
Map<String, byte[]> attributes = increment.getAttributesMap();
if (!attributes.isEmpty()) {
NameBytesPair.Builder attributeBuilder = NameBytesPair.newBuilder();
for (Map.Entry<String, byte[]> attribute : attributes.entrySet()) {
attributeBuilder.setName(attribute.getKey());
attributeBuilder.setValue(ByteStringer.wrap(attribute.getValue()));
builder.addAttribute(attributeBuilder.build());
}
}
return builder.build();
}
public static MutationProto toMutation(final MutationType type, final Mutation mutation)
throws IOException {
return toMutation(type, mutation, HConstants.NO_NONCE);
@ -1217,6 +1140,10 @@ public final class ProtobufUtil {
if (nonce != HConstants.NO_NONCE) {
builder.setNonce(nonce);
}
if (type == MutationType.INCREMENT) {
TimeRange timeRange = ((Increment) mutation).getTimeRange();
setTimeRange(builder, timeRange);
}
ColumnValue.Builder columnBuilder = ColumnValue.newBuilder();
QualifierValue.Builder valueBuilder = QualifierValue.newBuilder();
for (Map.Entry<byte[],List<Cell>> family: mutation.getFamilyCellMap().entrySet()) {

View File

@ -34,6 +34,7 @@ import java.util.NavigableSet;
import java.util.Optional;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
@ -741,6 +742,61 @@ public final class ProtobufUtil {
}
return delete;
}
@FunctionalInterface
private interface ConsumerWithException <T, U> {
void accept(T t, U u) throws IOException;
}
private static <T extends Mutation> T toDelta(Function<Bytes, T> supplier, ConsumerWithException<T, Cell> consumer,
final MutationProto proto, final CellScanner cellScanner) throws IOException {
byte[] row = proto.hasRow() ? proto.getRow().toByteArray() : null;
T mutation = row == null ? null : supplier.apply(new Bytes(row));
int cellCount = proto.hasAssociatedCellCount() ? proto.getAssociatedCellCount() : 0;
if (cellCount > 0) {
// The proto has metadata only and the data is separate to be found in the cellScanner.
if (cellScanner == null) {
throw new DoNotRetryIOException("Cell count of " + cellCount + " but no cellScanner: " +
toShortString(proto));
}
for (int i = 0; i < cellCount; i++) {
if (!cellScanner.advance()) {
throw new DoNotRetryIOException("Cell count of " + cellCount + " but at index " + i +
" no cell returned: " + toShortString(proto));
}
Cell cell = cellScanner.current();
if (mutation == null) {
mutation = supplier.apply(new Bytes(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()));
}
consumer.accept(mutation, cell);
}
} else {
if (mutation == null) {
throw new IllegalArgumentException("row cannot be null");
}
for (ColumnValue column : proto.getColumnValueList()) {
byte[] family = column.getFamily().toByteArray();
for (QualifierValue qv : column.getQualifierValueList()) {
byte[] qualifier = qv.getQualifier().toByteArray();
if (!qv.hasValue()) {
throw new DoNotRetryIOException(
"Missing required field: qualifier value");
}
byte[] value = qv.getValue().toByteArray();
byte[] tags = null;
if (qv.hasTags()) {
tags = qv.getTags().toByteArray();
}
consumer.accept(mutation, CellUtil.createCell(mutation.getRow(), family, qualifier, qv.getTimestamp(),
KeyValue.Type.Put, value, tags));
}
}
}
mutation.setDurability(toDurability(proto.getDurability()));
for (NameBytesPair attribute : proto.getAttributeList()) {
mutation.setAttribute(attribute.getName(), attribute.getValue().toByteArray());
}
return mutation;
}
/**
* Convert a protocol buffer Mutate to an Append
@ -750,56 +806,31 @@ public final class ProtobufUtil {
* @throws IOException
*/
public static Append toAppend(final MutationProto proto, final CellScanner cellScanner)
throws IOException {
throws IOException {
MutationType type = proto.getMutateType();
assert type == MutationType.APPEND : type.name();
byte[] row = proto.hasRow() ? proto.getRow().toByteArray() : null;
Append append = row != null ? new Append(row) : null;
int cellCount = proto.hasAssociatedCellCount()? proto.getAssociatedCellCount(): 0;
if (cellCount > 0) {
// The proto has metadata only and the data is separate to be found in the cellScanner.
if (cellScanner == null) {
throw new DoNotRetryIOException("Cell count of " + cellCount + " but no cellScanner: " +
toShortString(proto));
}
for (int i = 0; i < cellCount; i++) {
if (!cellScanner.advance()) {
throw new DoNotRetryIOException("Cell count of " + cellCount + " but at index " + i +
" no cell returned: " + toShortString(proto));
}
Cell cell = cellScanner.current();
if (append == null) {
append = new Append(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength());
}
append.add(cell);
}
} else {
if (append == null) {
throw new IllegalArgumentException("row cannot be null");
}
for (ColumnValue column: proto.getColumnValueList()) {
byte[] family = column.getFamily().toByteArray();
for (QualifierValue qv: column.getQualifierValueList()) {
byte[] qualifier = qv.getQualifier().toByteArray();
if (!qv.hasValue()) {
throw new DoNotRetryIOException(
"Missing required field: qualifier value");
}
byte[] value = qv.getValue().toByteArray();
byte[] tags = null;
if (qv.hasTags()) {
tags = qv.getTags().toByteArray();
}
append.add(CellUtil.createCell(row, family, qualifier, qv.getTimestamp(),
KeyValue.Type.Put, value, tags));
}
}
return toDelta((Bytes row) -> new Append(row.get(), row.getOffset(), row.getLength()),
Append::add, proto, cellScanner);
}
/**
* Convert a protocol buffer Mutate to an Increment
*
* @param proto the protocol buffer Mutate to convert
* @return the converted client Increment
* @throws IOException
*/
public static Increment toIncrement(final MutationProto proto, final CellScanner cellScanner)
throws IOException {
MutationType type = proto.getMutateType();
assert type == MutationType.INCREMENT : type.name();
Increment increment = toDelta((Bytes row) -> new Increment(row.get(), row.getOffset(), row.getLength()),
Increment::add, proto, cellScanner);
if (proto.hasTimeRange()) {
TimeRange timeRange = protoToTimeRange(proto.getTimeRange());
increment.setTimeRange(timeRange.getMin(), timeRange.getMax());
}
append.setDurability(toDurability(proto.getDurability()));
for (NameBytesPair attribute: proto.getAttributeList()) {
append.setAttribute(attribute.getName(), attribute.getValue().toByteArray());
}
return append;
return increment;
}
/**
@ -823,69 +854,6 @@ public final class ProtobufUtil {
throw new IOException("Unknown mutation type " + type);
}
/**
* Convert a protocol buffer Mutate to an Increment
*
* @param proto the protocol buffer Mutate to convert
* @return the converted client Increment
* @throws IOException
*/
public static Increment toIncrement(final MutationProto proto, final CellScanner cellScanner)
throws IOException {
MutationType type = proto.getMutateType();
assert type == MutationType.INCREMENT : type.name();
byte [] row = proto.hasRow()? proto.getRow().toByteArray(): null;
Increment increment = row != null ? new Increment(row) : null;
int cellCount = proto.hasAssociatedCellCount()? proto.getAssociatedCellCount(): 0;
if (cellCount > 0) {
// The proto has metadata only and the data is separate to be found in the cellScanner.
if (cellScanner == null) {
throw new DoNotRetryIOException("Cell count of " + cellCount + " but no cellScanner: " +
TextFormat.shortDebugString(proto));
}
for (int i = 0; i < cellCount; i++) {
if (!cellScanner.advance()) {
throw new DoNotRetryIOException("Cell count of " + cellCount + " but at index " + i +
" no cell returned: " + TextFormat.shortDebugString(proto));
}
Cell cell = cellScanner.current();
if (increment == null) {
increment = new Increment(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength());
}
increment.add(cell);
}
} else {
if (increment == null) {
throw new IllegalArgumentException("row cannot be null");
}
for (ColumnValue column: proto.getColumnValueList()) {
byte[] family = column.getFamily().toByteArray();
for (QualifierValue qv: column.getQualifierValueList()) {
byte[] qualifier = qv.getQualifier().toByteArray();
if (!qv.hasValue()) {
throw new DoNotRetryIOException("Missing required field: qualifier value");
}
byte[] value = qv.getValue().toByteArray();
byte[] tags = null;
if (qv.hasTags()) {
tags = qv.getTags().toByteArray();
}
increment.add(CellUtil.createCell(row, family, qualifier, qv.getTimestamp(),
KeyValue.Type.Put, value, tags));
}
}
}
if (proto.hasTimeRange()) {
TimeRange timeRange = protoToTimeRange(proto.getTimeRange());
increment.setTimeRange(timeRange.getMin(), timeRange.getMax());
}
increment.setDurability(toDurability(proto.getDurability()));
for (NameBytesPair attribute : proto.getAttributeList()) {
increment.setAttribute(attribute.getName(), attribute.getValue().toByteArray());
}
return increment;
}
/**
* Convert a protocol buffer Mutate to a Get.
* @param proto the protocol buffer Mutate to convert.
@ -1290,56 +1258,6 @@ public final class ProtobufUtil {
}
}
/**
* Convert a client Increment to a protobuf Mutate.
*
* @param increment
* @return the converted mutate
*/
public static MutationProto toMutation(
final Increment increment, final MutationProto.Builder builder, long nonce) {
builder.setRow(UnsafeByteOperations.unsafeWrap(increment.getRow()));
builder.setMutateType(MutationType.INCREMENT);
builder.setDurability(toDurability(increment.getDurability()));
if (nonce != HConstants.NO_NONCE) {
builder.setNonce(nonce);
}
TimeRange timeRange = increment.getTimeRange();
setTimeRange(builder, timeRange);
ColumnValue.Builder columnBuilder = ColumnValue.newBuilder();
QualifierValue.Builder valueBuilder = QualifierValue.newBuilder();
for (Map.Entry<byte[], List<Cell>> family: increment.getFamilyCellMap().entrySet()) {
columnBuilder.setFamily(UnsafeByteOperations.unsafeWrap(family.getKey()));
columnBuilder.clearQualifierValue();
List<Cell> values = family.getValue();
if (values != null && values.size() > 0) {
for (Cell cell: values) {
valueBuilder.clear();
valueBuilder.setQualifier(UnsafeByteOperations.unsafeWrap(
cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength()));
valueBuilder.setValue(UnsafeByteOperations.unsafeWrap(
cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()));
if (cell.getTagsLength() > 0) {
valueBuilder.setTags(UnsafeByteOperations.unsafeWrap(cell.getTagsArray(),
cell.getTagsOffset(), cell.getTagsLength()));
}
columnBuilder.addQualifierValue(valueBuilder.build());
}
}
builder.addColumnValue(columnBuilder.build());
}
Map<String, byte[]> attributes = increment.getAttributesMap();
if (!attributes.isEmpty()) {
NameBytesPair.Builder attributeBuilder = NameBytesPair.newBuilder();
for (Map.Entry<String, byte[]> attribute : attributes.entrySet()) {
attributeBuilder.setName(attribute.getKey());
attributeBuilder.setValue(UnsafeByteOperations.unsafeWrap(attribute.getValue()));
builder.addAttribute(attributeBuilder.build());
}
}
return builder.build();
}
public static MutationProto toMutation(final MutationType type, final Mutation mutation)
throws IOException {
return toMutation(type, mutation, HConstants.NO_NONCE);
@ -1370,6 +1288,10 @@ public final class ProtobufUtil {
if (nonce != HConstants.NO_NONCE) {
builder.setNonce(nonce);
}
if (type == MutationType.INCREMENT) {
TimeRange timeRange = ((Increment) mutation).getTimeRange();
setTimeRange(builder, timeRange);
}
ColumnValue.Builder columnBuilder = ColumnValue.newBuilder();
QualifierValue.Builder valueBuilder = QualifierValue.newBuilder();
for (Map.Entry<byte[],List<Cell>> family: mutation.getFamilyCellMap().entrySet()) {

View File

@ -201,6 +201,7 @@ public final class RequestConverter {
valueBuilder.setValue(UnsafeByteOperations.unsafeWrap(Bytes.toBytes(amount)));
valueBuilder.setQualifier(UnsafeByteOperations
.unsafeWrap(qualifier == null ? HConstants.EMPTY_BYTE_ARRAY : qualifier));
valueBuilder.setTimestamp(HConstants.LATEST_TIMESTAMP);
columnBuilder.addQualifierValue(valueBuilder.build());
mutateBuilder.addColumnValue(columnBuilder.build());
if (nonce != HConstants.NO_NONCE) {
@ -364,7 +365,7 @@ public final class RequestConverter {
* @return a mutate request
*/
public static MutateRequest buildMutateRequest(final byte[] regionName,
final Increment increment, final long nonceGroup, final long nonce) {
final Increment increment, final long nonceGroup, final long nonce) throws IOException {
MutateRequest.Builder builder = MutateRequest.newBuilder();
RegionSpecifier region = buildRegionSpecifier(
RegionSpecifierType.REGION_NAME, regionName);
@ -372,7 +373,8 @@ public final class RequestConverter {
if (nonce != HConstants.NO_NONCE && nonceGroup != HConstants.NO_NONCE) {
builder.setNonceGroup(nonceGroup);
}
builder.setMutation(ProtobufUtil.toMutation(increment, MutationProto.newBuilder(), nonce));
builder.setMutation(ProtobufUtil.toMutation(MutationType.INCREMENT, increment,
MutationProto.newBuilder(), nonce));
return builder.build();
}
@ -649,8 +651,8 @@ public final class RequestConverter {
regionActionBuilder.addAction(actionBuilder.setMutation(ProtobufUtil.toMutation(
MutationType.APPEND, (Append)row, mutationBuilder, action.getNonce())));
} else if (row instanceof Increment) {
regionActionBuilder.addAction(actionBuilder.setMutation(
ProtobufUtil.toMutation((Increment)row, mutationBuilder, action.getNonce())));
regionActionBuilder.addAction(actionBuilder.setMutation(ProtobufUtil.toMutation(
MutationType.INCREMENT, (Increment)row, mutationBuilder, action.getNonce())));
} else if (row instanceof RegionCoprocessorServiceExec) {
RegionCoprocessorServiceExec exec = (RegionCoprocessorServiceExec) row;
// DUMB COPY!!! FIX!!! Done to copy from c.g.p.ByteString to shaded ByteString.

View File

@ -32,6 +32,7 @@ import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.lang.reflect.Constructor;
import java.nio.ByteBuffer;
import java.text.ParseException;
import java.util.AbstractList;
import java.util.ArrayList;
@ -73,6 +74,7 @@ import java.util.concurrent.atomic.LongAdder;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Function;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -173,6 +175,7 @@ import org.apache.hadoop.hbase.snapshot.SnapshotManifest;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.CancelableProgressable;
import org.apache.hadoop.hbase.util.ClassSize;
import org.apache.hadoop.hbase.util.CollectionUtils;
import org.apache.hadoop.hbase.util.CompressionTest;
import org.apache.hadoop.hbase.util.EncryptionTest;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
@ -7499,14 +7502,20 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
mutationType = MutationType.INCREMENT;
// If delta amount to apply is 0, don't write WAL or MemStore.
long deltaAmount = getLongValue(delta);
// TODO: Does zero value mean reset Cell? For example, the ttl.
apply = deltaAmount != 0;
newCell = reckonIncrement(delta, deltaAmount, currentValue, columnFamily, now,
(Increment)mutation);
final long newValue = currentValue == null ? deltaAmount : getLongValue(currentValue) + deltaAmount;
newCell = reckonDelta(delta, currentValue, columnFamily, now, mutation, (oldCell) -> Bytes.toBytes(newValue));
break;
case APPEND:
mutationType = MutationType.APPEND;
// Always apply Append. TODO: Does empty delta value mean reset Cell? It seems to.
newCell = reckonAppend(delta, currentValue, now, (Append)mutation);
newCell = reckonDelta(delta, currentValue, columnFamily, now, mutation, (oldCell) ->
ByteBuffer.wrap(new byte[delta.getValueLength() + oldCell.getValueLength()])
.put(oldCell.getValueArray(), oldCell.getValueOffset(), oldCell.getValueLength())
.put(delta.getValueArray(), delta.getValueOffset(), delta.getValueLength())
.array()
);
break;
default: throw new UnsupportedOperationException(op.toString());
}
@ -7528,82 +7537,26 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
return toApply;
}
/**
* Calculate new Increment Cell.
* @return New Increment Cell with delta applied to currentValue if currentValue is not null;
* otherwise, a new Cell with the delta set as its value.
*/
private Cell reckonIncrement(final Cell delta, final long deltaAmount, final Cell currentValue,
byte [] columnFamily, final long now, Mutation mutation)
throws IOException {
private static Cell reckonDelta(final Cell delta, final Cell currentCell,
final byte[] columnFamily, final long now,
Mutation mutation, Function<Cell, byte[]> supplier) throws IOException {
// Forward any tags found on the delta.
List<Tag> tags = TagUtil.carryForwardTags(delta);
long newValue = deltaAmount;
long ts = now;
if (currentValue != null) {
tags = TagUtil.carryForwardTags(tags, currentValue);
ts = Math.max(now, currentValue.getTimestamp() + 1);
newValue += getLongValue(currentValue);
}
// Now make up the new Cell. TODO: FIX. This is carnel knowledge of how KeyValues are made...
// doesn't work well with offheaping or if we are doing a different Cell type.
byte [] incrementAmountInBytes = Bytes.toBytes(newValue);
tags = TagUtil.carryForwardTTLTag(tags, mutation.getTTL());
byte [] row = mutation.getRow();
return new KeyValue(row, 0, row.length,
columnFamily, 0, columnFamily.length,
delta.getQualifierArray(), delta.getQualifierOffset(), delta.getQualifierLength(),
ts, KeyValue.Type.Put,
incrementAmountInBytes, 0, incrementAmountInBytes.length,
tags);
}
private Cell reckonAppend(final Cell delta, final Cell currentValue, final long now,
Append mutation)
throws IOException {
// Forward any tags found on the delta.
List<Tag> tags = TagUtil.carryForwardTags(delta);
long ts = now;
Cell newCell = null;
byte [] row = mutation.getRow();
if (currentValue != null) {
tags = TagUtil.carryForwardTags(tags, currentValue);
ts = Math.max(now, currentValue.getTimestamp() + 1);
tags = TagUtil.carryForwardTTLTag(tags, mutation.getTTL());
byte[] tagBytes = TagUtil.fromList(tags);
// Allocate an empty cell and copy in all parts.
// TODO: This is intimate knowledge of how a KeyValue is made. Undo!!! Prevents our doing
// other Cell types. Copying on-heap too if an off-heap Cell.
newCell = new KeyValue(row.length, delta.getFamilyLength(),
delta.getQualifierLength(), ts, KeyValue.Type.Put,
delta.getValueLength() + currentValue.getValueLength(),
tagBytes == null? 0: tagBytes.length);
// Copy in row, family, and qualifier
System.arraycopy(row, 0, newCell.getRowArray(), newCell.getRowOffset(), row.length);
System.arraycopy(delta.getFamilyArray(), delta.getFamilyOffset(),
newCell.getFamilyArray(), newCell.getFamilyOffset(), delta.getFamilyLength());
System.arraycopy(delta.getQualifierArray(), delta.getQualifierOffset(),
newCell.getQualifierArray(), newCell.getQualifierOffset(), delta.getQualifierLength());
// Copy in the value
CellUtil.copyValueTo(currentValue, newCell.getValueArray(), newCell.getValueOffset());
System.arraycopy(delta.getValueArray(), delta.getValueOffset(),
newCell.getValueArray(), newCell.getValueOffset() + currentValue.getValueLength(),
delta.getValueLength());
// Copy in tag data
if (tagBytes != null) {
System.arraycopy(tagBytes, 0,
newCell.getTagsArray(), newCell.getTagsOffset(), tagBytes.length);
}
if (currentCell != null) {
tags = TagUtil.carryForwardTags(tags, currentCell);
byte[] newValue = supplier.apply(currentCell);
// TODO: FIX. This is carnel knowledge of how KeyValues are made...
// This will be fixed by HBASE-18519
return new KeyValue(mutation.getRow(), 0, mutation.getRow().length,
columnFamily, 0, columnFamily.length,
delta.getQualifierArray(), delta.getQualifierOffset(), delta.getQualifierLength(),
Math.max(currentCell.getTimestamp() + 1, now),
KeyValue.Type.Put, newValue, 0, newValue.length, tags);
} else {
// Append's KeyValue.Type==Put and ts==HConstants.LATEST_TIMESTAMP
CellUtil.updateLatestStamp(delta, now);
newCell = delta;
tags = TagUtil.carryForwardTTLTag(tags, mutation.getTTL());
if (tags != null) {
newCell = CellUtil.createCell(delta, tags);
}
return CollectionUtils.isEmpty(tags) ? delta : CellUtil.createCell(delta, tags);
}
return newCell;
}
/**

View File

@ -0,0 +1,85 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
import java.io.IOException;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
/**
* Run Append tests that use the HBase clients;
*/
@Category(LargeTests.class)
public class TestAppendFromClientSide {
protected final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
private static byte [] ROW = Bytes.toBytes("testRow");
private static byte [] FAMILY = Bytes.toBytes("testFamily");
private static byte [] QUALIFIER = Bytes.toBytes("testQualifier");
@Rule
public TestName name = new TestName();
@BeforeClass
public static void beforeClass() throws Exception {
Configuration conf = TEST_UTIL.getConfiguration();
TEST_UTIL.startMiniCluster(3);
}
/**
* @throws java.lang.Exception
*/
@AfterClass
public static void afterClass() throws Exception {
TEST_UTIL.shutdownMiniCluster();
}
@Test
public void testAppendWithCustomTimestamp() throws IOException {
TableName TABLENAME = TableName.valueOf(name.getMethodName());
Table table = TEST_UTIL.createTable(TABLENAME, FAMILY);
long timestamp = 999;
Append append = new Append(ROW);
append.add(CellUtil.createCell(ROW, FAMILY, QUALIFIER, timestamp, KeyValue.Type.Put.getCode(), Bytes.toBytes(100L)));
Result r = table.append(append);
assertEquals(1, r.size());
assertEquals(timestamp, r.rawCells()[0].getTimestamp());
r = table.get(new Get(ROW));
assertEquals(1, r.size());
assertEquals(timestamp, r.rawCells()[0].getTimestamp());
r = table.append(append);
assertEquals(1, r.size());
assertNotEquals(timestamp, r.rawCells()[0].getTimestamp());
r = table.get(new Get(ROW));
assertEquals(1, r.size());
assertNotEquals(timestamp, r.rawCells()[0].getTimestamp());
}
}

View File

@ -19,6 +19,7 @@
package org.apache.hadoop.hbase.client;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@ -33,10 +34,12 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
import org.apache.hadoop.hbase.coprocessor.MultiRowMutationEndpoint;
@ -462,6 +465,26 @@ public class TestIncrementsFromClientSide {
}
}
@Test
public void testIncrementWithCustomTimestamp() throws IOException {
TableName TABLENAME = TableName.valueOf(name.getMethodName());
Table table = TEST_UTIL.createTable(TABLENAME, FAMILY);
long timestamp = 999;
Increment increment = new Increment(ROW);
increment.add(CellUtil.createCell(ROW, FAMILY, QUALIFIER, timestamp, KeyValue.Type.Put.getCode(), Bytes.toBytes(100L)));
Result r = table.increment(increment);
assertEquals(1, r.size());
assertEquals(timestamp, r.rawCells()[0].getTimestamp());
r = table.get(new Get(ROW));
assertEquals(1, r.size());
assertEquals(timestamp, r.rawCells()[0].getTimestamp());
r = table.increment(increment);
assertEquals(1, r.size());
assertNotEquals(timestamp, r.rawCells()[0].getTimestamp());
r = table.get(new Get(ROW));
assertEquals(1, r.size());
assertNotEquals(timestamp, r.rawCells()[0].getTimestamp());
}
/**
* Call over to the adjacent class's method of same name.

View File

@ -27,7 +27,6 @@ import java.nio.ByteBuffer;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.ProcedureInfo;
import org.apache.hadoop.hbase.ProcedureState;
@ -130,7 +129,6 @@ public class TestProtobufUtil {
qualifierBuilder.setQualifier(ByteString.copyFromUtf8("c2"));
qualifierBuilder.setValue(ByteString.copyFromUtf8("v2"));
valueBuilder.addQualifierValue(qualifierBuilder.build());
qualifierBuilder.setTimestamp(timeStamp);
mutateBuilder.addColumnValue(valueBuilder.build());
MutationProto proto = mutateBuilder.build();
@ -203,6 +201,7 @@ public class TestProtobufUtil {
*/
@Test
public void testIncrement() throws IOException {
long timeStamp = 111111;
MutationProto.Builder mutateBuilder = MutationProto.newBuilder();
mutateBuilder.setRow(ByteString.copyFromUtf8("row"));
mutateBuilder.setMutateType(MutationType.INCREMENT);
@ -211,6 +210,7 @@ public class TestProtobufUtil {
QualifierValue.Builder qualifierBuilder = QualifierValue.newBuilder();
qualifierBuilder.setQualifier(ByteString.copyFromUtf8("c1"));
qualifierBuilder.setValue(ByteString.copyFrom(Bytes.toBytes(11L)));
qualifierBuilder.setTimestamp(timeStamp);
valueBuilder.addQualifierValue(qualifierBuilder.build());
qualifierBuilder.setQualifier(ByteString.copyFromUtf8("c2"));
qualifierBuilder.setValue(ByteString.copyFrom(Bytes.toBytes(22L)));
@ -226,8 +226,8 @@ public class TestProtobufUtil {
mutateBuilder.setDurability(MutationProto.Durability.USE_DEFAULT);
Increment increment = ProtobufUtil.toIncrement(proto, null);
assertEquals(mutateBuilder.build(),
ProtobufUtil.toMutation(increment, MutationProto.newBuilder(), HConstants.NO_NONCE));
mutateBuilder.setTimestamp(increment.getTimeStamp());
assertEquals(mutateBuilder.build(), ProtobufUtil.toMutation(MutationType.INCREMENT, increment));
}
/**

View File

@ -22,12 +22,22 @@ import static org.junit.Assert.fail;
import org.apache.hadoop.hbase.ProcedureInfo;
import org.apache.hadoop.hbase.ProcedureState;
import org.apache.hadoop.hbase.client.Append;
import org.apache.hadoop.hbase.client.Increment;
import org.apache.hadoop.hbase.procedure2.LockInfo;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.ByteString;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto.ColumnValue;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto.ColumnValue.QualifierValue;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto.MutationType;
import org.apache.hadoop.hbase.shaded.protobuf.generated.LockServiceProtos;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import java.io.IOException;
@Category(SmallTests.class)
public class TestProtobufUtil {
public TestProtobufUtil() {
@ -148,4 +158,80 @@ public class TestProtobufUtil {
assertWaitingProcedureEquals(waitingProcedure, waitingProcedure2);
}
/**
* Test Increment Mutate conversions.
*
* @throws IOException
*/
@Test
public void testIncrement() throws IOException {
long timeStamp = 111111;
MutationProto.Builder mutateBuilder = MutationProto.newBuilder();
mutateBuilder.setRow(ByteString.copyFromUtf8("row"));
mutateBuilder.setMutateType(MutationProto.MutationType.INCREMENT);
ColumnValue.Builder valueBuilder = ColumnValue.newBuilder();
valueBuilder.setFamily(ByteString.copyFromUtf8("f1"));
QualifierValue.Builder qualifierBuilder = QualifierValue.newBuilder();
qualifierBuilder.setQualifier(ByteString.copyFromUtf8("c1"));
qualifierBuilder.setValue(ByteString.copyFrom(Bytes.toBytes(11L)));
qualifierBuilder.setTimestamp(timeStamp);
valueBuilder.addQualifierValue(qualifierBuilder.build());
qualifierBuilder.setQualifier(ByteString.copyFromUtf8("c2"));
qualifierBuilder.setValue(ByteString.copyFrom(Bytes.toBytes(22L)));
valueBuilder.addQualifierValue(qualifierBuilder.build());
mutateBuilder.addColumnValue(valueBuilder.build());
MutationProto proto = mutateBuilder.build();
// default fields
assertEquals(MutationProto.Durability.USE_DEFAULT, proto.getDurability());
// set the default value for equal comparison
mutateBuilder = MutationProto.newBuilder(proto);
mutateBuilder.setDurability(MutationProto.Durability.USE_DEFAULT);
Increment increment = ProtobufUtil.toIncrement(proto, null);
mutateBuilder.setTimestamp(increment.getTimeStamp());
assertEquals(mutateBuilder.build(), ProtobufUtil.toMutation(MutationType.INCREMENT, increment));
}
/**
* Test Append Mutate conversions.
*
* @throws IOException
*/
@Test
public void testAppend() throws IOException {
long timeStamp = 111111;
MutationProto.Builder mutateBuilder = MutationProto.newBuilder();
mutateBuilder.setRow(ByteString.copyFromUtf8("row"));
mutateBuilder.setMutateType(MutationType.APPEND);
mutateBuilder.setTimestamp(timeStamp);
ColumnValue.Builder valueBuilder = ColumnValue.newBuilder();
valueBuilder.setFamily(ByteString.copyFromUtf8("f1"));
QualifierValue.Builder qualifierBuilder = QualifierValue.newBuilder();
qualifierBuilder.setQualifier(ByteString.copyFromUtf8("c1"));
qualifierBuilder.setValue(ByteString.copyFromUtf8("v1"));
qualifierBuilder.setTimestamp(timeStamp);
valueBuilder.addQualifierValue(qualifierBuilder.build());
qualifierBuilder.setQualifier(ByteString.copyFromUtf8("c2"));
qualifierBuilder.setValue(ByteString.copyFromUtf8("v2"));
valueBuilder.addQualifierValue(qualifierBuilder.build());
mutateBuilder.addColumnValue(valueBuilder.build());
MutationProto proto = mutateBuilder.build();
// default fields
assertEquals(MutationProto.Durability.USE_DEFAULT, proto.getDurability());
// set the default value for equal comparison
mutateBuilder = MutationProto.newBuilder(proto);
mutateBuilder.setDurability(MutationProto.Durability.USE_DEFAULT);
Append append = ProtobufUtil.toAppend(proto, null);
// append always use the latest timestamp,
// reset the timestamp to the original mutate
mutateBuilder.setTimestamp(append.getTimeStamp());
assertEquals(mutateBuilder.build(), ProtobufUtil.toMutation(MutationType.APPEND, append));
}
}