diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Append.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Append.java index 89ea082135b..da07ea6ed08 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Append.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Append.java @@ -24,9 +24,11 @@ import java.util.UUID; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.io.TimeRange; import org.apache.hadoop.hbase.security.access.Permission; import org.apache.hadoop.hbase.security.visibility.CellVisibility; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.ClassSize; import org.apache.yetus.audience.InterfaceAudience; /** @@ -42,6 +44,42 @@ import org.apache.yetus.audience.InterfaceAudience; */ @InterfaceAudience.Public public class Append extends Mutation { + private static final long HEAP_OVERHEAD = ClassSize.REFERENCE + ClassSize.TIMERANGE; + private TimeRange tr = new TimeRange(); + + /** + * Sets the TimeRange to be used on the Get for this append. + *

+ * This is useful for when you have counters that only last for specific + * periods of time (ie. counters that are partitioned by time). By setting + * the range of valid times for this append, you can potentially gain + * some performance with a more optimal Get operation. + * Be careful adding the time range to this class as you will update the old cell if the + * time range doesn't include the latest cells. + *

+ * This range is used as [minStamp, maxStamp). + * @param minStamp minimum timestamp value, inclusive + * @param maxStamp maximum timestamp value, exclusive + * @return this + */ + public Append setTimeRange(long minStamp, long maxStamp) { + tr = new TimeRange(minStamp, maxStamp); + return this; + } + + /** + * Gets the TimeRange used for this append. + * @return TimeRange + */ + public TimeRange getTimeRange() { + return this.tr; + } + + @Override + protected long extraHeapSize(){ + return HEAP_OVERHEAD; + } + /** * @param returnResults * True (default) if the append operation should return the results. @@ -77,6 +115,7 @@ public class Append extends Mutation { public Append(Append a) { this.row = a.getRow(); this.ts = a.getTimeStamp(); + this.tr = a.getTimeRange(); this.familyMap.putAll(a.getFamilyCellMap()); for (Map.Entry entry : a.getAttributesMap().entrySet()) { this.setAttribute(entry.getKey(), entry.getValue()); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Increment.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Increment.java index 2f8dd26f640..52c0c598970 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Increment.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Increment.java @@ -138,6 +138,8 @@ public class Increment extends Mutation implements Comparable { * periods of time (ie. counters that are partitioned by time). By setting * the range of valid times for this increment, you can potentially gain * some performance with a more optimal Get operation. + * Be careful adding the time range to this class as you will update the old cell if the + * time range doesn't include the latest cells. *

* This range is used as [minStamp, maxStamp). * @param minStamp minimum timestamp value, inclusive diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java index 92169966be2..309fb064330 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java @@ -715,8 +715,13 @@ public final class ProtobufUtil { throws IOException { MutationType type = proto.getMutateType(); assert type == MutationType.APPEND : type.name(); - return toDelta((Bytes row) -> new Append(row.get(), row.getOffset(), row.getLength()), - Append::add, proto, cellScanner); + Append append = toDelta((Bytes row) -> new Append(row.get(), row.getOffset(), row.getLength()), + Append::add, proto, cellScanner); + if (proto.hasTimeRange()) { + TimeRange timeRange = protoToTimeRange(proto.getTimeRange()); + append.setTimeRange(timeRange.getMin(), timeRange.getMax()); + } + return append; } /** @@ -1176,6 +1181,10 @@ public final class ProtobufUtil { TimeRange timeRange = ((Increment) mutation).getTimeRange(); setTimeRange(builder, timeRange); } + if (type == MutationType.APPEND) { + TimeRange timeRange = ((Append) mutation).getTimeRange(); + setTimeRange(builder, timeRange); + } ColumnValue.Builder columnBuilder = ColumnValue.newBuilder(); QualifierValue.Builder valueBuilder = QualifierValue.newBuilder(); for (Map.Entry> family: mutation.getFamilyCellMap().entrySet()) { @@ -1234,6 +1243,9 @@ public final class ProtobufUtil { if (mutation instanceof Increment) { setTimeRange(builder, ((Increment)mutation).getTimeRange()); } + if (mutation instanceof Append) { + setTimeRange(builder, ((Append)mutation).getTimeRange()); + } if (nonce != HConstants.NO_NONCE) { builder.setNonce(nonce); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java index 5971b3c7a28..f856c7eb062 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java @@ -856,8 +856,13 @@ public final class ProtobufUtil { throws IOException { MutationType type = proto.getMutateType(); assert type == MutationType.APPEND : type.name(); - return toDelta((Bytes row) -> new Append(row.get(), row.getOffset(), row.getLength()), + Append append = toDelta((Bytes row) -> new Append(row.get(), row.getOffset(), row.getLength()), Append::add, proto, cellScanner); + if (proto.hasTimeRange()) { + TimeRange timeRange = protoToTimeRange(proto.getTimeRange()); + append.setTimeRange(timeRange.getMin(), timeRange.getMax()); + } + return append; } /** @@ -1340,6 +1345,10 @@ public final class ProtobufUtil { TimeRange timeRange = ((Increment) mutation).getTimeRange(); setTimeRange(builder, timeRange); } + if (type == MutationType.APPEND) { + TimeRange timeRange = ((Append) mutation).getTimeRange(); + setTimeRange(builder, timeRange); + } ColumnValue.Builder columnBuilder = ColumnValue.newBuilder(); QualifierValue.Builder valueBuilder = QualifierValue.newBuilder(); for (Map.Entry> family: mutation.getFamilyCellMap().entrySet()) { @@ -1398,6 +1407,9 @@ public final class ProtobufUtil { if (mutation instanceof Increment) { setTimeRange(builder, ((Increment)mutation).getTimeRange()); } + if (mutation instanceof Append) { + setTimeRange(builder, ((Append)mutation).getTimeRange()); + } if (nonce != HConstants.NO_NONCE) { builder.setNonce(nonce); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 85c12e9433a..8ca11848e71 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -7672,9 +7672,18 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi byte[] columnFamily = store.getColumnFamilyDescriptor().getName(); List toApply = new ArrayList<>(deltas.size()); // Get previous values for all columns in this family. - List currentValues = get(mutation, store, deltas, - null/*Default IsolationLevel*/, - op == Operation.INCREMENT? ((Increment)mutation).getTimeRange(): null); + TimeRange tr = null; + switch (op) { + case INCREMENT: + tr = ((Increment)mutation).getTimeRange(); + break; + case APPEND: + tr = ((Append)mutation).getTimeRange(); + break; + default: + break; + } + List currentValues = get(mutation, store, deltas,null, tr); // Iterate the input columns and update existing values if they were found, otherwise // add new column initialized to the delta amount int currentValuesIndex = 0; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java index 2e2d263ee3a..0f9d8a5a8f8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java @@ -1952,7 +1952,7 @@ public class AccessController implements MasterCoprocessor, RegionCoprocessor, AuthResult authResult = null; User user = getActiveUser(c); if (checkCoveringPermission(user, OpType.APPEND, c.getEnvironment(), append.getRow(), - append.getFamilyCellMap(), HConstants.LATEST_TIMESTAMP, Action.WRITE)) { + append.getFamilyCellMap(), append.getTimeRange().getMax(), Action.WRITE)) { authResult = AuthResult.allow(OpType.APPEND.toString(), "Covering cell set", user, Action.WRITE, table, append.getFamilyCellMap()); } else { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestAppendTimeRange.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestAppendTimeRange.java new file mode 100644 index 00000000000..88420758012 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestAppendTimeRange.java @@ -0,0 +1,159 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.coprocessor; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.NavigableMap; +import java.util.Optional; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Append; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Row; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.io.TimeRange; +import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher; +import org.apache.hadoop.hbase.testclassification.CoprocessorTests; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.util.ManualEnvironmentEdge; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; + +@Category({CoprocessorTests.class, MediumTests.class}) +public class TestAppendTimeRange { + + @Rule + public TestName name = new TestName(); + + private static final HBaseTestingUtility util = new HBaseTestingUtility(); + private static final ManualEnvironmentEdge mee = new ManualEnvironmentEdge(); + + private static final byte[] TEST_FAMILY = Bytes.toBytes("f1"); + + private static final byte[] ROW = Bytes.toBytes("aaa"); + + private static final byte[] QUAL = Bytes.toBytes("col1"); + + private static final byte[] VALUE = Bytes.toBytes("1"); + + @BeforeClass + public static void setupBeforeClass() throws Exception { + util.getConfiguration().set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, + MyObserver.class.getName()); + // Make general delay zero rather than default. Timing is off in this + // test that depends on an evironment edge that is manually moved forward. + util.getConfiguration().setInt(RemoteProcedureDispatcher.DISPATCH_DELAY_CONF_KEY, 0); + util.startMiniCluster(); + EnvironmentEdgeManager.injectEdge(mee); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + util.shutdownMiniCluster(); + } + + public static class MyObserver implements RegionCoprocessor, RegionObserver { + private static TimeRange tr10 = null; + private static TimeRange tr2 = null; + + @Override + public Optional getRegionObserver() { + return Optional.of(this); + } + + @Override + public Result preAppend(final ObserverContext e, + final Append append) throws IOException { + NavigableMap> map = append.getFamilyCellMap(); + for (Map.Entry> entry : map.entrySet()) { + for (Cell cell : entry.getValue()) { + String appendStr = Bytes.toString(cell.getValueArray(), cell.getValueOffset(), + cell.getValueLength()); + if (appendStr.equals("b")) { + tr10 = append.getTimeRange(); + } else if (appendStr.equals("c") && !append.getTimeRange().isAllTime()) { + tr2 = append.getTimeRange(); + } + } + } + return null; + } + } + + @Test + public void testHTableInterfaceMethods() throws Exception { + try (Table table = util.createTable(TableName.valueOf(name.getMethodName()), TEST_FAMILY)) { + table.put(new Put(ROW).addColumn(TEST_FAMILY, QUAL, VALUE)); + long time = EnvironmentEdgeManager.currentTime(); + mee.setValue(time); + table.put(new Put(ROW).addColumn(TEST_FAMILY, QUAL, Bytes.toBytes("a"))); + checkRowValue(table, ROW, Bytes.toBytes("a")); + + time = EnvironmentEdgeManager.currentTime(); + mee.setValue(time); + TimeRange range10 = new TimeRange(1, time + 10); + Result r = table.append(new Append(ROW).addColumn(TEST_FAMILY, QUAL, Bytes.toBytes("b")) + .setTimeRange(range10.getMin(), range10.getMax())); + checkRowValue(table, ROW, Bytes.toBytes("ab")); + assertEquals(MyObserver.tr10.getMin(), range10.getMin()); + assertEquals(MyObserver.tr10.getMax(), range10.getMax()); + time = EnvironmentEdgeManager.currentTime(); + mee.setValue(time); + TimeRange range2 = new TimeRange(1, time+20); + List actions = + Arrays.asList(new Row[] { + new Append(ROW).addColumn(TEST_FAMILY, QUAL, Bytes.toBytes("c")) + .setTimeRange(range2.getMin(), range2.getMax()), + new Append(ROW).addColumn(TEST_FAMILY, QUAL, Bytes.toBytes("c")) + .setTimeRange(range2.getMin(), range2.getMax()) }); + Object[] results1 = new Object[actions.size()]; + table.batch(actions, results1); + assertEquals(MyObserver.tr2.getMin(), range2.getMin()); + assertEquals(MyObserver.tr2.getMax(), range2.getMax()); + for (Object r2 : results1) { + assertTrue(r2 instanceof Result); + } + checkRowValue(table, ROW, Bytes.toBytes("abcc")); + } + } + + private void checkRowValue(Table table, byte[] row, byte[] expectedValue) throws IOException { + Get get = new Get(row).addColumn(TEST_FAMILY, QUAL); + Result result = table.get(get); + byte[] actualValue = result.getValue(TEST_FAMILY, QUAL); + assertArrayEquals(expectedValue, actualValue); + } +}