HBASE-19550 Wrap the cell passed via Mutation#add(Cell) to be of ExtendedCell
This commit is contained in:
parent
1556939236
commit
7ce1943ef3
|
@ -17,12 +17,12 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.client;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.NavigableMap;
|
||||
import java.util.UUID;
|
||||
import org.apache.hadoop.hbase.Cell;
|
||||
import org.apache.hadoop.hbase.CellUtil;
|
||||
import org.apache.hadoop.hbase.KeyValue;
|
||||
import org.apache.hadoop.hbase.io.TimeRange;
|
||||
import org.apache.hadoop.hbase.security.access.Permission;
|
||||
|
@ -30,6 +30,8 @@ import org.apache.hadoop.hbase.security.visibility.CellVisibility;
|
|||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.ClassSize;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
* Performs Append operations on a single row.
|
||||
|
@ -44,6 +46,7 @@ import org.apache.yetus.audience.InterfaceAudience;
|
|||
*/
|
||||
@InterfaceAudience.Public
|
||||
public class Append extends Mutation {
|
||||
private static final Logger LOG = LoggerFactory.getLogger(Append.class);
|
||||
private static final long HEAP_OVERHEAD = ClassSize.REFERENCE + ClassSize.TIMERANGE;
|
||||
private TimeRange tr = new TimeRange();
|
||||
|
||||
|
@ -176,14 +179,12 @@ public class Append extends Mutation {
|
|||
*/
|
||||
@SuppressWarnings("unchecked")
|
||||
public Append add(final Cell cell) {
|
||||
// Presume it is KeyValue for now.
|
||||
byte [] family = CellUtil.cloneFamily(cell);
|
||||
|
||||
// Get cell list for the family
|
||||
List<Cell> list = getCellList(family);
|
||||
|
||||
// find where the new entry should be placed in the List
|
||||
list.add(cell);
|
||||
try {
|
||||
super.add(cell);
|
||||
} catch (IOException e) {
|
||||
// we eat the exception of wrong row for BC..
|
||||
LOG.error(e.toString(), e);
|
||||
}
|
||||
return this;
|
||||
}
|
||||
|
||||
|
|
|
@ -25,7 +25,6 @@ import java.util.Map;
|
|||
import java.util.NavigableMap;
|
||||
import java.util.UUID;
|
||||
import org.apache.hadoop.hbase.Cell;
|
||||
import org.apache.hadoop.hbase.CellUtil;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.KeyValue;
|
||||
import org.apache.hadoop.hbase.security.access.Permission;
|
||||
|
@ -170,22 +169,12 @@ public class Delete extends Mutation implements Comparable<Row> {
|
|||
|
||||
/**
|
||||
* Add an existing delete marker to this Delete object.
|
||||
* @param kv An existing KeyValue of type "delete".
|
||||
* @param cell An existing cell of type "delete".
|
||||
* @return this for invocation chaining
|
||||
* @throws IOException
|
||||
*/
|
||||
public Delete add(Cell kv) throws IOException {
|
||||
if (!CellUtil.isDelete(kv)) {
|
||||
throw new IOException("The recently added KeyValue is not of type "
|
||||
+ "delete. Rowkey: " + Bytes.toStringBinary(this.row));
|
||||
}
|
||||
if (!CellUtil.matchingRows(kv, this.row)) {
|
||||
throw new WrongRowIOException("The row in " + kv.toString() +
|
||||
" doesn't match the original one " + Bytes.toStringBinary(this.row));
|
||||
}
|
||||
byte [] family = CellUtil.cloneFamily(kv);
|
||||
List<Cell> list = getCellList(family);
|
||||
list.add(kv);
|
||||
public Delete add(Cell cell) throws IOException {
|
||||
super.add(cell);
|
||||
return this;
|
||||
}
|
||||
|
||||
|
|
|
@ -98,14 +98,7 @@ public class Increment extends Mutation implements Comparable<Row> {
|
|||
* @throws java.io.IOException e
|
||||
*/
|
||||
public Increment add(Cell cell) throws IOException{
|
||||
byte [] family = CellUtil.cloneFamily(cell);
|
||||
List<Cell> list = getCellList(family);
|
||||
//Checking that the row of the kv is the same as the put
|
||||
if (!CellUtil.matchingRows(cell, this.row)) {
|
||||
throw new WrongRowIOException("The row in " + cell +
|
||||
" doesn't match the original one " + Bytes.toStringBinary(this.row));
|
||||
}
|
||||
list.add(cell);
|
||||
super.add(cell);
|
||||
return this;
|
||||
}
|
||||
|
||||
|
|
|
@ -18,24 +18,31 @@
|
|||
|
||||
package org.apache.hadoop.hbase.client;
|
||||
|
||||
import static org.apache.hadoop.hbase.Tag.TAG_LENGTH_SIZE;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashMap;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.NavigableMap;
|
||||
import java.util.Optional;
|
||||
import java.util.TreeMap;
|
||||
import java.util.UUID;
|
||||
import java.util.stream.Collectors;
|
||||
import org.apache.hadoop.hbase.ArrayBackedTag;
|
||||
import org.apache.hadoop.hbase.Cell;
|
||||
import org.apache.hadoop.hbase.CellScannable;
|
||||
import org.apache.hadoop.hbase.CellScanner;
|
||||
import org.apache.hadoop.hbase.CellUtil;
|
||||
import org.apache.hadoop.hbase.ExtendedCell;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.KeyValue;
|
||||
import org.apache.hadoop.hbase.PrivateCellUtil;
|
||||
import org.apache.hadoop.hbase.RawCell;
|
||||
import org.apache.hadoop.hbase.Tag;
|
||||
import org.apache.hadoop.hbase.exceptions.DeserializationException;
|
||||
import org.apache.hadoop.hbase.io.HeapSize;
|
||||
|
@ -53,6 +60,7 @@ import org.apache.yetus.audience.InterfaceAudience;
|
|||
import org.apache.hadoop.hbase.shaded.com.google.common.base.Preconditions;
|
||||
import org.apache.hadoop.hbase.shaded.com.google.common.collect.ArrayListMultimap;
|
||||
import org.apache.hadoop.hbase.shaded.com.google.common.collect.ListMultimap;
|
||||
import org.apache.hadoop.hbase.shaded.com.google.common.collect.Lists;
|
||||
import org.apache.hadoop.hbase.shaded.com.google.common.io.ByteArrayDataInput;
|
||||
import org.apache.hadoop.hbase.shaded.com.google.common.io.ByteArrayDataOutput;
|
||||
import org.apache.hadoop.hbase.shaded.com.google.common.io.ByteStreams;
|
||||
|
@ -757,4 +765,204 @@ public abstract class Mutation extends OperationWithAttributes implements Row, C
|
|||
HConstants.MAX_ROW_LENGTH);
|
||||
}
|
||||
}
|
||||
|
||||
Mutation add(Cell cell) throws IOException {
|
||||
//Checking that the row of the kv is the same as the mutation
|
||||
// TODO: It is fraught with risk if user pass the wrong row.
|
||||
// Throwing the IllegalArgumentException is more suitable I'd say.
|
||||
if (!CellUtil.matchingRows(cell, this.row)) {
|
||||
throw new WrongRowIOException("The row in " + cell.toString() +
|
||||
" doesn't match the original one " + Bytes.toStringBinary(this.row));
|
||||
}
|
||||
|
||||
if (cell.getFamilyArray() == null || cell.getFamilyLength() == 0) {
|
||||
throw new IllegalArgumentException("Family cannot be null");
|
||||
}
|
||||
|
||||
byte[] family = CellUtil.cloneFamily(cell);
|
||||
if (cell instanceof ExtendedCell) {
|
||||
getCellList(family).add(cell);
|
||||
} else {
|
||||
getCellList(family).add(new CellWrapper(cell));
|
||||
}
|
||||
return this;
|
||||
}
|
||||
|
||||
private static final class CellWrapper implements ExtendedCell {
|
||||
private static final long FIXED_OVERHEAD = ClassSize.align(
|
||||
ClassSize.OBJECT // object header
|
||||
+ KeyValue.TIMESTAMP_SIZE // timestamp
|
||||
+ Bytes.SIZEOF_LONG // sequence id
|
||||
+ 1 * ClassSize.REFERENCE); // references to cell
|
||||
private final Cell cell;
|
||||
private long sequenceId;
|
||||
private long timestamp;
|
||||
|
||||
CellWrapper(Cell cell) {
|
||||
assert !(cell instanceof ExtendedCell);
|
||||
this.cell = cell;
|
||||
this.sequenceId = cell.getSequenceId();
|
||||
this.timestamp = cell.getTimestamp();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setSequenceId(long seqId) {
|
||||
sequenceId = seqId;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setTimestamp(long ts) {
|
||||
timestamp = ts;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setTimestamp(byte[] ts) {
|
||||
timestamp = Bytes.toLong(ts);
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getSequenceId() {
|
||||
return sequenceId;
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte[] getValueArray() {
|
||||
return cell.getValueArray();
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getValueOffset() {
|
||||
return cell.getValueOffset();
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getValueLength() {
|
||||
return cell.getValueLength();
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte[] getTagsArray() {
|
||||
return cell.getTagsArray();
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getTagsOffset() {
|
||||
return cell.getTagsOffset();
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getTagsLength() {
|
||||
return cell.getTagsLength();
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte[] getRowArray() {
|
||||
return cell.getRowArray();
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getRowOffset() {
|
||||
return cell.getRowOffset();
|
||||
}
|
||||
|
||||
@Override
|
||||
public short getRowLength() {
|
||||
return cell.getRowLength();
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte[] getFamilyArray() {
|
||||
return cell.getFamilyArray();
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getFamilyOffset() {
|
||||
return cell.getFamilyOffset();
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte getFamilyLength() {
|
||||
return cell.getFamilyLength();
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte[] getQualifierArray() {
|
||||
return cell.getQualifierArray();
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getQualifierOffset() {
|
||||
return cell.getQualifierOffset();
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getQualifierLength() {
|
||||
return cell.getQualifierLength();
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getTimestamp() {
|
||||
return timestamp;
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte getTypeByte() {
|
||||
return cell.getTypeByte();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Optional<Tag> getTag(byte type) {
|
||||
if (cell instanceof RawCell) {
|
||||
return ((RawCell) cell).getTag(type);
|
||||
}
|
||||
int length = getTagsLength();
|
||||
int offset = getTagsOffset();
|
||||
int pos = offset;
|
||||
while (pos < offset + length) {
|
||||
int tagLen = Bytes.readAsInt(getTagsArray(), pos, TAG_LENGTH_SIZE);
|
||||
if (getTagsArray()[pos + TAG_LENGTH_SIZE] == type) {
|
||||
return Optional.of(new ArrayBackedTag(getTagsArray(), pos,
|
||||
tagLen + TAG_LENGTH_SIZE));
|
||||
}
|
||||
pos += TAG_LENGTH_SIZE + tagLen;
|
||||
}
|
||||
return Optional.empty();
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<Tag> getTags() {
|
||||
if (cell instanceof RawCell) {
|
||||
return ((RawCell) cell).getTags();
|
||||
}
|
||||
return Lists.newArrayList(PrivateCellUtil.tagsIterator(cell));
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte[] cloneTags() {
|
||||
if (cell instanceof RawCell) {
|
||||
return ((RawCell) cell).cloneTags();
|
||||
} else {
|
||||
return PrivateCellUtil.cloneTags(cell);
|
||||
}
|
||||
}
|
||||
|
||||
private long heapOverhead() {
|
||||
return FIXED_OVERHEAD
|
||||
+ ClassSize.ARRAY // row
|
||||
+ getFamilyLength() == 0 ? 0 : ClassSize.ARRAY
|
||||
+ getQualifierLength() == 0 ? 0 : ClassSize.ARRAY
|
||||
+ getValueLength() == 0 ? 0 : ClassSize.ARRAY
|
||||
+ getTagsLength() == 0 ? 0 : ClassSize.ARRAY;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long heapSize() {
|
||||
return heapOverhead()
|
||||
+ ClassSize.align(getRowLength())
|
||||
+ ClassSize.align(getFamilyLength())
|
||||
+ ClassSize.align(getQualifierLength())
|
||||
+ ClassSize.align(getValueLength())
|
||||
+ ClassSize.align(getTagsLength());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -26,7 +26,6 @@ import java.util.Map;
|
|||
import java.util.NavigableMap;
|
||||
import java.util.UUID;
|
||||
import org.apache.hadoop.hbase.Cell;
|
||||
import org.apache.hadoop.hbase.CellUtil;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.IndividualBytesFieldCell;
|
||||
import org.apache.hadoop.hbase.KeyValue;
|
||||
|
@ -275,30 +274,12 @@ public class Put extends Mutation implements HeapSize, Comparable<Row> {
|
|||
* Add the specified KeyValue to this Put operation. Operation assumes that
|
||||
* the passed KeyValue is immutable and its backing array will not be modified
|
||||
* for the duration of this Put.
|
||||
* @param kv individual KeyValue
|
||||
* @param cell individual cell
|
||||
* @return this
|
||||
* @throws java.io.IOException e
|
||||
*/
|
||||
public Put add(Cell kv) throws IOException {
|
||||
// Family can not be null, otherwise NullPointerException is thrown when putting
|
||||
// the cell into familyMap
|
||||
if (kv.getFamilyArray() == null) {
|
||||
throw new IllegalArgumentException("Family cannot be null");
|
||||
}
|
||||
|
||||
// Check timestamp
|
||||
if (ts < 0) {
|
||||
throw new IllegalArgumentException("Timestamp cannot be negative. ts=" + ts);
|
||||
}
|
||||
|
||||
byte [] family = CellUtil.cloneFamily(kv);
|
||||
List<Cell> list = getCellList(family);
|
||||
//Checking that the row of the kv is the same as the put
|
||||
if (!CellUtil.matchingRows(kv, this.row)) {
|
||||
throw new WrongRowIOException("The row in " + kv.toString() +
|
||||
" doesn't match the original one " + Bytes.toStringBinary(this.row));
|
||||
}
|
||||
list.add(kv);
|
||||
public Put add(Cell cell) throws IOException {
|
||||
super.add(cell);
|
||||
return this;
|
||||
}
|
||||
|
||||
|
|
|
@ -0,0 +1,403 @@
|
|||
/**
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hbase.coprocessor;
|
||||
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.fail;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.IntStream;
|
||||
import org.apache.hadoop.hbase.CategoryBasedTimeout;
|
||||
import org.apache.hadoop.hbase.Cell;
|
||||
import org.apache.hadoop.hbase.CellUtil;
|
||||
import org.apache.hadoop.hbase.CompareOperator;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.client.Admin;
|
||||
import org.apache.hadoop.hbase.client.Append;
|
||||
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
|
||||
import org.apache.hadoop.hbase.client.Delete;
|
||||
import org.apache.hadoop.hbase.client.Durability;
|
||||
import org.apache.hadoop.hbase.client.Get;
|
||||
import org.apache.hadoop.hbase.client.Increment;
|
||||
import org.apache.hadoop.hbase.client.Put;
|
||||
import org.apache.hadoop.hbase.client.Result;
|
||||
import org.apache.hadoop.hbase.client.Table;
|
||||
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
|
||||
import org.apache.hadoop.hbase.filter.ByteArrayComparable;
|
||||
import org.apache.hadoop.hbase.testclassification.CoprocessorTests;
|
||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.wal.WALEdit;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.Before;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
import org.junit.rules.TestName;
|
||||
import org.junit.rules.TestRule;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
@Category({ CoprocessorTests.class, MediumTests.class })
|
||||
public class TestPassCustomCellViaRegionObserver {
|
||||
private static final Logger LOG =
|
||||
LoggerFactory.getLogger(TestPassCustomCellViaRegionObserver.class);
|
||||
|
||||
@Rule public final TestRule timeout = CategoryBasedTimeout.builder().
|
||||
withTimeout(this.getClass()).withLookingForStuckThread(true).build();
|
||||
|
||||
@Rule
|
||||
public TestName testName = new TestName();
|
||||
|
||||
private TableName tableName;
|
||||
private Table table = null;
|
||||
|
||||
private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
|
||||
|
||||
private static final byte[] ROW = Bytes.toBytes("ROW");
|
||||
private static final byte[] FAMILY = Bytes.toBytes("FAMILY");
|
||||
private static final byte[] QUALIFIER = Bytes.toBytes("QUALIFIER");
|
||||
private static final byte[] VALUE = Bytes.toBytes(10L);
|
||||
private static final byte[] APPEND_VALUE = Bytes.toBytes("MB");
|
||||
|
||||
private static final byte[] QUALIFIER_FROM_CP = Bytes.toBytes("QUALIFIER_FROM_CP");
|
||||
|
||||
@BeforeClass
|
||||
public static void setupBeforeClass() throws Exception {
|
||||
// small retry number can speed up the failed tests.
|
||||
UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2);
|
||||
UTIL.startMiniCluster();
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void tearDownAfterClass() throws Exception {
|
||||
UTIL.shutdownMiniCluster();
|
||||
}
|
||||
|
||||
@Before
|
||||
public void clearTable() throws IOException {
|
||||
RegionObserverImpl.COUNT.set(0);
|
||||
tableName = TableName.valueOf(testName.getMethodName());
|
||||
if (table != null) {
|
||||
table.close();
|
||||
}
|
||||
try (Admin admin = UTIL.getAdmin()) {
|
||||
for (TableName name : admin.listTableNames()) {
|
||||
try {
|
||||
admin.disableTable(name);
|
||||
} catch (IOException e) {
|
||||
}
|
||||
admin.deleteTable(name);
|
||||
}
|
||||
table = UTIL.createTable(TableDescriptorBuilder.newBuilder(tableName)
|
||||
.addColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY))
|
||||
.addCoprocessor(RegionObserverImpl.class.getName())
|
||||
.build(), null);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMutation() throws Exception {
|
||||
|
||||
Put put = new Put(ROW);
|
||||
put.addColumn(FAMILY, QUALIFIER, VALUE);
|
||||
table.put(put);
|
||||
byte[] value = VALUE;
|
||||
assertResult(table.get(new Get(ROW)), value, value);
|
||||
assertObserverHasExecuted();
|
||||
|
||||
Increment inc = new Increment(ROW);
|
||||
inc.addColumn(FAMILY, QUALIFIER, 10L);
|
||||
table.increment(inc);
|
||||
// QUALIFIER -> 10 (put) + 10 (increment)
|
||||
// QUALIFIER_FROM_CP -> 10 (from cp's put) + 10 (from cp's increment)
|
||||
value = Bytes.toBytes(20L);
|
||||
assertResult(table.get(new Get(ROW)), value, value);
|
||||
assertObserverHasExecuted();
|
||||
|
||||
Append append = new Append(ROW);
|
||||
append.addColumn(FAMILY, QUALIFIER, APPEND_VALUE);
|
||||
table.append(append);
|
||||
// 10L + "MB"
|
||||
value = ByteBuffer.wrap(new byte[value.length + APPEND_VALUE.length])
|
||||
.put(value)
|
||||
.put(APPEND_VALUE)
|
||||
.array();
|
||||
assertResult(table.get(new Get(ROW)), value, value);
|
||||
assertObserverHasExecuted();
|
||||
|
||||
Delete delete = new Delete(ROW);
|
||||
delete.addColumns(FAMILY, QUALIFIER);
|
||||
table.delete(delete);
|
||||
assertTrue(Arrays.asList(table.get(new Get(ROW)).rawCells()).toString(),
|
||||
table.get(new Get(ROW)).isEmpty());
|
||||
assertObserverHasExecuted();
|
||||
|
||||
assertTrue(table.checkAndPut(ROW, FAMILY, QUALIFIER, null, put));
|
||||
assertObserverHasExecuted();
|
||||
|
||||
assertTrue(table.checkAndDelete(ROW, FAMILY, QUALIFIER, VALUE, delete));
|
||||
assertObserverHasExecuted();
|
||||
|
||||
assertTrue(table.get(new Get(ROW)).isEmpty());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMultiPut() throws Exception {
|
||||
List<Put> puts = IntStream.range(0, 10)
|
||||
.mapToObj(i -> new Put(ROW).addColumn(FAMILY, Bytes.toBytes(i), VALUE))
|
||||
.collect(Collectors.toList());
|
||||
table.put(puts);
|
||||
assertResult(table.get(new Get(ROW)), VALUE);
|
||||
assertObserverHasExecuted();
|
||||
|
||||
List<Delete> deletes = IntStream.range(0, 10)
|
||||
.mapToObj(i -> new Delete(ROW).addColumn(FAMILY, Bytes.toBytes(i)))
|
||||
.collect(Collectors.toList());
|
||||
table.delete(deletes);
|
||||
assertTrue(table.get(new Get(ROW)).isEmpty());
|
||||
assertObserverHasExecuted();
|
||||
}
|
||||
|
||||
private static void assertObserverHasExecuted() {
|
||||
assertTrue(RegionObserverImpl.COUNT.getAndSet(0) > 0);
|
||||
}
|
||||
|
||||
private static void assertResult(Result result, byte[] expectedValue) {
|
||||
assertFalse(result.isEmpty());
|
||||
for (Cell c : result.rawCells()) {
|
||||
assertTrue(c.toString(), Bytes.equals(ROW, CellUtil.cloneRow(c)));
|
||||
assertTrue(c.toString(), Bytes.equals(FAMILY, CellUtil.cloneFamily(c)));
|
||||
assertTrue(c.toString(), Bytes.equals(expectedValue, CellUtil.cloneValue(c)));
|
||||
}
|
||||
}
|
||||
|
||||
private static void assertResult(Result result, byte[] expectedValue, byte[] expectedFromCp) {
|
||||
assertFalse(result.isEmpty());
|
||||
for (Cell c : result.rawCells()) {
|
||||
assertTrue(c.toString(), Bytes.equals(ROW, CellUtil.cloneRow(c)));
|
||||
assertTrue(c.toString(), Bytes.equals(FAMILY, CellUtil.cloneFamily(c)));
|
||||
if (Bytes.equals(QUALIFIER, CellUtil.cloneQualifier(c))) {
|
||||
assertTrue(c.toString(), Bytes.equals(expectedValue, CellUtil.cloneValue(c)));
|
||||
} else if (Bytes.equals(QUALIFIER_FROM_CP, CellUtil.cloneQualifier(c))) {
|
||||
assertTrue(c.toString(), Bytes.equals(expectedFromCp, CellUtil.cloneValue(c)));
|
||||
} else {
|
||||
fail("No valid qualifier");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private static Cell createCustomCell(byte[] row, byte[] family, byte[] qualifier,
|
||||
Cell.DataType type, byte[] value) {
|
||||
return new Cell() {
|
||||
|
||||
private byte[] getArray(byte[] array) {
|
||||
return array == null ? HConstants.EMPTY_BYTE_ARRAY : array;
|
||||
}
|
||||
|
||||
private int length(byte[] array) {
|
||||
return array == null ? 0 : array.length;
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte[] getRowArray() {
|
||||
return getArray(row);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getRowOffset() {
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public short getRowLength() {
|
||||
return (short) length(row);
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte[] getFamilyArray() {
|
||||
return getArray(family);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getFamilyOffset() {
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte getFamilyLength() {
|
||||
return (byte) length(family);
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte[] getQualifierArray() {
|
||||
return getArray(qualifier);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getQualifierOffset() {
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getQualifierLength() {
|
||||
return length(qualifier);
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getTimestamp() {
|
||||
return HConstants.LATEST_TIMESTAMP;
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte getTypeByte() {
|
||||
return type.getCode();
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getSequenceId() {
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte[] getValueArray() {
|
||||
return getArray(value);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getValueOffset() {
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getValueLength() {
|
||||
return length(value);
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte[] getTagsArray() {
|
||||
return getArray(null);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getTagsOffset() {
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getTagsLength() {
|
||||
return length(null);
|
||||
}
|
||||
|
||||
@Override
|
||||
public DataType getType() {
|
||||
return type;
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
private static Cell createCustomCell(Put put) {
|
||||
return createCustomCell(put.getRow(), FAMILY, QUALIFIER_FROM_CP, Cell.DataType.Put, VALUE);
|
||||
}
|
||||
|
||||
private static Cell createCustomCell(Append append) {
|
||||
return createCustomCell(append.getRow(), FAMILY, QUALIFIER_FROM_CP, Cell.DataType.Put,
|
||||
APPEND_VALUE);
|
||||
}
|
||||
|
||||
private static Cell createCustomCell(Increment inc) {
|
||||
return createCustomCell(inc.getRow(), FAMILY, QUALIFIER_FROM_CP, Cell.DataType.Put, VALUE);
|
||||
}
|
||||
|
||||
private static Cell createCustomCell(Delete delete) {
|
||||
return createCustomCell(delete.getRow(), FAMILY, QUALIFIER_FROM_CP,
|
||||
Cell.DataType.DeleteColumn, null);
|
||||
}
|
||||
|
||||
public static class RegionObserverImpl implements RegionCoprocessor, RegionObserver {
|
||||
static final AtomicInteger COUNT = new AtomicInteger(0);
|
||||
|
||||
@Override
|
||||
public Optional<RegionObserver> getRegionObserver() {
|
||||
return Optional.of(this);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void prePut(ObserverContext<RegionCoprocessorEnvironment> c, Put put, WALEdit edit,
|
||||
Durability durability) throws IOException {
|
||||
put.add(createCustomCell(put));
|
||||
COUNT.incrementAndGet();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void preDelete(ObserverContext<RegionCoprocessorEnvironment> c, Delete delete,
|
||||
WALEdit edit, Durability durability) throws IOException {
|
||||
delete.add(createCustomCell(delete));
|
||||
COUNT.incrementAndGet();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean preCheckAndPut(ObserverContext<RegionCoprocessorEnvironment> c, byte[] row,
|
||||
byte[] family, byte[] qualifier, CompareOperator op, ByteArrayComparable comparator, Put put,
|
||||
boolean result) throws IOException {
|
||||
put.add(createCustomCell(put));
|
||||
COUNT.incrementAndGet();
|
||||
return result;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean preCheckAndDelete(ObserverContext<RegionCoprocessorEnvironment> c, byte[] row,
|
||||
byte[] family, byte[] qualifier, CompareOperator op, ByteArrayComparable comparator,
|
||||
Delete delete, boolean result) throws IOException {
|
||||
delete.add(createCustomCell(delete));
|
||||
COUNT.incrementAndGet();
|
||||
return result;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Result preAppend(ObserverContext<RegionCoprocessorEnvironment> c, Append append)
|
||||
throws IOException {
|
||||
append.add(createCustomCell(append));
|
||||
COUNT.incrementAndGet();
|
||||
return null;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public Result preIncrement(ObserverContext<RegionCoprocessorEnvironment> c, Increment increment)
|
||||
throws IOException {
|
||||
increment.add(createCustomCell(increment));
|
||||
COUNT.incrementAndGet();
|
||||
return null;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
Loading…
Reference in New Issue