HBASE-19427 Add TimeRange support into Append to optimize for counters
This commit is contained in:
parent
941acc5c05
commit
55eb59e654
|
@ -24,9 +24,11 @@ import java.util.UUID;
|
||||||
import org.apache.hadoop.hbase.Cell;
|
import org.apache.hadoop.hbase.Cell;
|
||||||
import org.apache.hadoop.hbase.CellUtil;
|
import org.apache.hadoop.hbase.CellUtil;
|
||||||
import org.apache.hadoop.hbase.KeyValue;
|
import org.apache.hadoop.hbase.KeyValue;
|
||||||
|
import org.apache.hadoop.hbase.io.TimeRange;
|
||||||
import org.apache.hadoop.hbase.security.access.Permission;
|
import org.apache.hadoop.hbase.security.access.Permission;
|
||||||
import org.apache.hadoop.hbase.security.visibility.CellVisibility;
|
import org.apache.hadoop.hbase.security.visibility.CellVisibility;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
import org.apache.hadoop.hbase.util.ClassSize;
|
||||||
import org.apache.yetus.audience.InterfaceAudience;
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -42,6 +44,42 @@ import org.apache.yetus.audience.InterfaceAudience;
|
||||||
*/
|
*/
|
||||||
@InterfaceAudience.Public
|
@InterfaceAudience.Public
|
||||||
public class Append extends Mutation {
|
public class Append extends Mutation {
|
||||||
|
private static final long HEAP_OVERHEAD = ClassSize.REFERENCE + ClassSize.TIMERANGE;
|
||||||
|
private TimeRange tr = new TimeRange();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Sets the TimeRange to be used on the Get for this append.
|
||||||
|
* <p>
|
||||||
|
* This is useful for when you have counters that only last for specific
|
||||||
|
* periods of time (ie. counters that are partitioned by time). By setting
|
||||||
|
* the range of valid times for this append, you can potentially gain
|
||||||
|
* some performance with a more optimal Get operation.
|
||||||
|
* Be careful adding the time range to this class as you will update the old cell if the
|
||||||
|
* time range doesn't include the latest cells.
|
||||||
|
* <p>
|
||||||
|
* This range is used as [minStamp, maxStamp).
|
||||||
|
* @param minStamp minimum timestamp value, inclusive
|
||||||
|
* @param maxStamp maximum timestamp value, exclusive
|
||||||
|
* @return this
|
||||||
|
*/
|
||||||
|
public Append setTimeRange(long minStamp, long maxStamp) {
|
||||||
|
tr = new TimeRange(minStamp, maxStamp);
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Gets the TimeRange used for this append.
|
||||||
|
* @return TimeRange
|
||||||
|
*/
|
||||||
|
public TimeRange getTimeRange() {
|
||||||
|
return this.tr;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected long extraHeapSize(){
|
||||||
|
return HEAP_OVERHEAD;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @param returnResults
|
* @param returnResults
|
||||||
* True (default) if the append operation should return the results.
|
* True (default) if the append operation should return the results.
|
||||||
|
@ -77,6 +115,7 @@ public class Append extends Mutation {
|
||||||
public Append(Append a) {
|
public Append(Append a) {
|
||||||
this.row = a.getRow();
|
this.row = a.getRow();
|
||||||
this.ts = a.getTimeStamp();
|
this.ts = a.getTimeStamp();
|
||||||
|
this.tr = a.getTimeRange();
|
||||||
this.familyMap.putAll(a.getFamilyCellMap());
|
this.familyMap.putAll(a.getFamilyCellMap());
|
||||||
for (Map.Entry<String, byte[]> entry : a.getAttributesMap().entrySet()) {
|
for (Map.Entry<String, byte[]> entry : a.getAttributesMap().entrySet()) {
|
||||||
this.setAttribute(entry.getKey(), entry.getValue());
|
this.setAttribute(entry.getKey(), entry.getValue());
|
||||||
|
|
|
@ -138,6 +138,8 @@ public class Increment extends Mutation implements Comparable<Row> {
|
||||||
* periods of time (ie. counters that are partitioned by time). By setting
|
* periods of time (ie. counters that are partitioned by time). By setting
|
||||||
* the range of valid times for this increment, you can potentially gain
|
* the range of valid times for this increment, you can potentially gain
|
||||||
* some performance with a more optimal Get operation.
|
* some performance with a more optimal Get operation.
|
||||||
|
* Be careful adding the time range to this class as you will update the old cell if the
|
||||||
|
* time range doesn't include the latest cells.
|
||||||
* <p>
|
* <p>
|
||||||
* This range is used as [minStamp, maxStamp).
|
* This range is used as [minStamp, maxStamp).
|
||||||
* @param minStamp minimum timestamp value, inclusive
|
* @param minStamp minimum timestamp value, inclusive
|
||||||
|
|
|
@ -715,8 +715,13 @@ public final class ProtobufUtil {
|
||||||
throws IOException {
|
throws IOException {
|
||||||
MutationType type = proto.getMutateType();
|
MutationType type = proto.getMutateType();
|
||||||
assert type == MutationType.APPEND : type.name();
|
assert type == MutationType.APPEND : type.name();
|
||||||
return toDelta((Bytes row) -> new Append(row.get(), row.getOffset(), row.getLength()),
|
Append append = toDelta((Bytes row) -> new Append(row.get(), row.getOffset(), row.getLength()),
|
||||||
Append::add, proto, cellScanner);
|
Append::add, proto, cellScanner);
|
||||||
|
if (proto.hasTimeRange()) {
|
||||||
|
TimeRange timeRange = protoToTimeRange(proto.getTimeRange());
|
||||||
|
append.setTimeRange(timeRange.getMin(), timeRange.getMax());
|
||||||
|
}
|
||||||
|
return append;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -1176,6 +1181,10 @@ public final class ProtobufUtil {
|
||||||
TimeRange timeRange = ((Increment) mutation).getTimeRange();
|
TimeRange timeRange = ((Increment) mutation).getTimeRange();
|
||||||
setTimeRange(builder, timeRange);
|
setTimeRange(builder, timeRange);
|
||||||
}
|
}
|
||||||
|
if (type == MutationType.APPEND) {
|
||||||
|
TimeRange timeRange = ((Append) mutation).getTimeRange();
|
||||||
|
setTimeRange(builder, timeRange);
|
||||||
|
}
|
||||||
ColumnValue.Builder columnBuilder = ColumnValue.newBuilder();
|
ColumnValue.Builder columnBuilder = ColumnValue.newBuilder();
|
||||||
QualifierValue.Builder valueBuilder = QualifierValue.newBuilder();
|
QualifierValue.Builder valueBuilder = QualifierValue.newBuilder();
|
||||||
for (Map.Entry<byte[],List<Cell>> family: mutation.getFamilyCellMap().entrySet()) {
|
for (Map.Entry<byte[],List<Cell>> family: mutation.getFamilyCellMap().entrySet()) {
|
||||||
|
@ -1234,6 +1243,9 @@ public final class ProtobufUtil {
|
||||||
if (mutation instanceof Increment) {
|
if (mutation instanceof Increment) {
|
||||||
setTimeRange(builder, ((Increment)mutation).getTimeRange());
|
setTimeRange(builder, ((Increment)mutation).getTimeRange());
|
||||||
}
|
}
|
||||||
|
if (mutation instanceof Append) {
|
||||||
|
setTimeRange(builder, ((Append)mutation).getTimeRange());
|
||||||
|
}
|
||||||
if (nonce != HConstants.NO_NONCE) {
|
if (nonce != HConstants.NO_NONCE) {
|
||||||
builder.setNonce(nonce);
|
builder.setNonce(nonce);
|
||||||
}
|
}
|
||||||
|
|
|
@ -856,8 +856,13 @@ public final class ProtobufUtil {
|
||||||
throws IOException {
|
throws IOException {
|
||||||
MutationType type = proto.getMutateType();
|
MutationType type = proto.getMutateType();
|
||||||
assert type == MutationType.APPEND : type.name();
|
assert type == MutationType.APPEND : type.name();
|
||||||
return toDelta((Bytes row) -> new Append(row.get(), row.getOffset(), row.getLength()),
|
Append append = toDelta((Bytes row) -> new Append(row.get(), row.getOffset(), row.getLength()),
|
||||||
Append::add, proto, cellScanner);
|
Append::add, proto, cellScanner);
|
||||||
|
if (proto.hasTimeRange()) {
|
||||||
|
TimeRange timeRange = protoToTimeRange(proto.getTimeRange());
|
||||||
|
append.setTimeRange(timeRange.getMin(), timeRange.getMax());
|
||||||
|
}
|
||||||
|
return append;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -1340,6 +1345,10 @@ public final class ProtobufUtil {
|
||||||
TimeRange timeRange = ((Increment) mutation).getTimeRange();
|
TimeRange timeRange = ((Increment) mutation).getTimeRange();
|
||||||
setTimeRange(builder, timeRange);
|
setTimeRange(builder, timeRange);
|
||||||
}
|
}
|
||||||
|
if (type == MutationType.APPEND) {
|
||||||
|
TimeRange timeRange = ((Append) mutation).getTimeRange();
|
||||||
|
setTimeRange(builder, timeRange);
|
||||||
|
}
|
||||||
ColumnValue.Builder columnBuilder = ColumnValue.newBuilder();
|
ColumnValue.Builder columnBuilder = ColumnValue.newBuilder();
|
||||||
QualifierValue.Builder valueBuilder = QualifierValue.newBuilder();
|
QualifierValue.Builder valueBuilder = QualifierValue.newBuilder();
|
||||||
for (Map.Entry<byte[],List<Cell>> family: mutation.getFamilyCellMap().entrySet()) {
|
for (Map.Entry<byte[],List<Cell>> family: mutation.getFamilyCellMap().entrySet()) {
|
||||||
|
@ -1398,6 +1407,9 @@ public final class ProtobufUtil {
|
||||||
if (mutation instanceof Increment) {
|
if (mutation instanceof Increment) {
|
||||||
setTimeRange(builder, ((Increment)mutation).getTimeRange());
|
setTimeRange(builder, ((Increment)mutation).getTimeRange());
|
||||||
}
|
}
|
||||||
|
if (mutation instanceof Append) {
|
||||||
|
setTimeRange(builder, ((Append)mutation).getTimeRange());
|
||||||
|
}
|
||||||
if (nonce != HConstants.NO_NONCE) {
|
if (nonce != HConstants.NO_NONCE) {
|
||||||
builder.setNonce(nonce);
|
builder.setNonce(nonce);
|
||||||
}
|
}
|
||||||
|
|
|
@ -7672,9 +7672,18 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
||||||
byte[] columnFamily = store.getColumnFamilyDescriptor().getName();
|
byte[] columnFamily = store.getColumnFamilyDescriptor().getName();
|
||||||
List<Cell> toApply = new ArrayList<>(deltas.size());
|
List<Cell> toApply = new ArrayList<>(deltas.size());
|
||||||
// Get previous values for all columns in this family.
|
// Get previous values for all columns in this family.
|
||||||
List<Cell> currentValues = get(mutation, store, deltas,
|
TimeRange tr = null;
|
||||||
null/*Default IsolationLevel*/,
|
switch (op) {
|
||||||
op == Operation.INCREMENT? ((Increment)mutation).getTimeRange(): null);
|
case INCREMENT:
|
||||||
|
tr = ((Increment)mutation).getTimeRange();
|
||||||
|
break;
|
||||||
|
case APPEND:
|
||||||
|
tr = ((Append)mutation).getTimeRange();
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
List<Cell> currentValues = get(mutation, store, deltas,null, tr);
|
||||||
// Iterate the input columns and update existing values if they were found, otherwise
|
// Iterate the input columns and update existing values if they were found, otherwise
|
||||||
// add new column initialized to the delta amount
|
// add new column initialized to the delta amount
|
||||||
int currentValuesIndex = 0;
|
int currentValuesIndex = 0;
|
||||||
|
|
|
@ -1952,7 +1952,7 @@ public class AccessController implements MasterCoprocessor, RegionCoprocessor,
|
||||||
AuthResult authResult = null;
|
AuthResult authResult = null;
|
||||||
User user = getActiveUser(c);
|
User user = getActiveUser(c);
|
||||||
if (checkCoveringPermission(user, OpType.APPEND, c.getEnvironment(), append.getRow(),
|
if (checkCoveringPermission(user, OpType.APPEND, c.getEnvironment(), append.getRow(),
|
||||||
append.getFamilyCellMap(), HConstants.LATEST_TIMESTAMP, Action.WRITE)) {
|
append.getFamilyCellMap(), append.getTimeRange().getMax(), Action.WRITE)) {
|
||||||
authResult = AuthResult.allow(OpType.APPEND.toString(), "Covering cell set",
|
authResult = AuthResult.allow(OpType.APPEND.toString(), "Covering cell set",
|
||||||
user, Action.WRITE, table, append.getFamilyCellMap());
|
user, Action.WRITE, table, append.getFamilyCellMap());
|
||||||
} else {
|
} else {
|
||||||
|
|
|
@ -0,0 +1,159 @@
|
||||||
|
/**
|
||||||
|
*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.hbase.coprocessor;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertArrayEquals;
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.NavigableMap;
|
||||||
|
import java.util.Optional;
|
||||||
|
import org.apache.hadoop.hbase.Cell;
|
||||||
|
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||||
|
import org.apache.hadoop.hbase.TableName;
|
||||||
|
import org.apache.hadoop.hbase.client.Append;
|
||||||
|
import org.apache.hadoop.hbase.client.Get;
|
||||||
|
import org.apache.hadoop.hbase.client.Put;
|
||||||
|
import org.apache.hadoop.hbase.client.Result;
|
||||||
|
import org.apache.hadoop.hbase.client.Row;
|
||||||
|
import org.apache.hadoop.hbase.client.Table;
|
||||||
|
import org.apache.hadoop.hbase.io.TimeRange;
|
||||||
|
import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher;
|
||||||
|
import org.apache.hadoop.hbase.testclassification.CoprocessorTests;
|
||||||
|
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||||
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||||
|
import org.apache.hadoop.hbase.util.ManualEnvironmentEdge;
|
||||||
|
import org.junit.AfterClass;
|
||||||
|
import org.junit.BeforeClass;
|
||||||
|
import org.junit.Rule;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.junit.experimental.categories.Category;
|
||||||
|
import org.junit.rules.TestName;
|
||||||
|
|
||||||
|
@Category({CoprocessorTests.class, MediumTests.class})
|
||||||
|
public class TestAppendTimeRange {
|
||||||
|
|
||||||
|
@Rule
|
||||||
|
public TestName name = new TestName();
|
||||||
|
|
||||||
|
private static final HBaseTestingUtility util = new HBaseTestingUtility();
|
||||||
|
private static final ManualEnvironmentEdge mee = new ManualEnvironmentEdge();
|
||||||
|
|
||||||
|
private static final byte[] TEST_FAMILY = Bytes.toBytes("f1");
|
||||||
|
|
||||||
|
private static final byte[] ROW = Bytes.toBytes("aaa");
|
||||||
|
|
||||||
|
private static final byte[] QUAL = Bytes.toBytes("col1");
|
||||||
|
|
||||||
|
private static final byte[] VALUE = Bytes.toBytes("1");
|
||||||
|
|
||||||
|
@BeforeClass
|
||||||
|
public static void setupBeforeClass() throws Exception {
|
||||||
|
util.getConfiguration().set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
|
||||||
|
MyObserver.class.getName());
|
||||||
|
// Make general delay zero rather than default. Timing is off in this
|
||||||
|
// test that depends on an evironment edge that is manually moved forward.
|
||||||
|
util.getConfiguration().setInt(RemoteProcedureDispatcher.DISPATCH_DELAY_CONF_KEY, 0);
|
||||||
|
util.startMiniCluster();
|
||||||
|
EnvironmentEdgeManager.injectEdge(mee);
|
||||||
|
}
|
||||||
|
|
||||||
|
@AfterClass
|
||||||
|
public static void tearDownAfterClass() throws Exception {
|
||||||
|
util.shutdownMiniCluster();
|
||||||
|
}
|
||||||
|
|
||||||
|
public static class MyObserver implements RegionCoprocessor, RegionObserver {
|
||||||
|
private static TimeRange tr10 = null;
|
||||||
|
private static TimeRange tr2 = null;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Optional<RegionObserver> getRegionObserver() {
|
||||||
|
return Optional.of(this);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Result preAppend(final ObserverContext<RegionCoprocessorEnvironment> e,
|
||||||
|
final Append append) throws IOException {
|
||||||
|
NavigableMap<byte [], List<Cell>> map = append.getFamilyCellMap();
|
||||||
|
for (Map.Entry<byte [], List<Cell>> entry : map.entrySet()) {
|
||||||
|
for (Cell cell : entry.getValue()) {
|
||||||
|
String appendStr = Bytes.toString(cell.getValueArray(), cell.getValueOffset(),
|
||||||
|
cell.getValueLength());
|
||||||
|
if (appendStr.equals("b")) {
|
||||||
|
tr10 = append.getTimeRange();
|
||||||
|
} else if (appendStr.equals("c") && !append.getTimeRange().isAllTime()) {
|
||||||
|
tr2 = append.getTimeRange();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testHTableInterfaceMethods() throws Exception {
|
||||||
|
try (Table table = util.createTable(TableName.valueOf(name.getMethodName()), TEST_FAMILY)) {
|
||||||
|
table.put(new Put(ROW).addColumn(TEST_FAMILY, QUAL, VALUE));
|
||||||
|
long time = EnvironmentEdgeManager.currentTime();
|
||||||
|
mee.setValue(time);
|
||||||
|
table.put(new Put(ROW).addColumn(TEST_FAMILY, QUAL, Bytes.toBytes("a")));
|
||||||
|
checkRowValue(table, ROW, Bytes.toBytes("a"));
|
||||||
|
|
||||||
|
time = EnvironmentEdgeManager.currentTime();
|
||||||
|
mee.setValue(time);
|
||||||
|
TimeRange range10 = new TimeRange(1, time + 10);
|
||||||
|
Result r = table.append(new Append(ROW).addColumn(TEST_FAMILY, QUAL, Bytes.toBytes("b"))
|
||||||
|
.setTimeRange(range10.getMin(), range10.getMax()));
|
||||||
|
checkRowValue(table, ROW, Bytes.toBytes("ab"));
|
||||||
|
assertEquals(MyObserver.tr10.getMin(), range10.getMin());
|
||||||
|
assertEquals(MyObserver.tr10.getMax(), range10.getMax());
|
||||||
|
time = EnvironmentEdgeManager.currentTime();
|
||||||
|
mee.setValue(time);
|
||||||
|
TimeRange range2 = new TimeRange(1, time+20);
|
||||||
|
List<Row> actions =
|
||||||
|
Arrays.asList(new Row[] {
|
||||||
|
new Append(ROW).addColumn(TEST_FAMILY, QUAL, Bytes.toBytes("c"))
|
||||||
|
.setTimeRange(range2.getMin(), range2.getMax()),
|
||||||
|
new Append(ROW).addColumn(TEST_FAMILY, QUAL, Bytes.toBytes("c"))
|
||||||
|
.setTimeRange(range2.getMin(), range2.getMax()) });
|
||||||
|
Object[] results1 = new Object[actions.size()];
|
||||||
|
table.batch(actions, results1);
|
||||||
|
assertEquals(MyObserver.tr2.getMin(), range2.getMin());
|
||||||
|
assertEquals(MyObserver.tr2.getMax(), range2.getMax());
|
||||||
|
for (Object r2 : results1) {
|
||||||
|
assertTrue(r2 instanceof Result);
|
||||||
|
}
|
||||||
|
checkRowValue(table, ROW, Bytes.toBytes("abcc"));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void checkRowValue(Table table, byte[] row, byte[] expectedValue) throws IOException {
|
||||||
|
Get get = new Get(row).addColumn(TEST_FAMILY, QUAL);
|
||||||
|
Result result = table.get(get);
|
||||||
|
byte[] actualValue = result.getValue(TEST_FAMILY, QUAL);
|
||||||
|
assertArrayEquals(expectedValue, actualValue);
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue