diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Append.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Append.java index 0cb51a2c854..b2995ed0d68 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Append.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Append.java @@ -17,12 +17,12 @@ */ package org.apache.hadoop.hbase.client; +import java.io.IOException; import java.util.List; import java.util.Map; import java.util.NavigableMap; import java.util.UUID; import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.io.TimeRange; import org.apache.hadoop.hbase.security.access.Permission; @@ -30,6 +30,8 @@ import org.apache.hadoop.hbase.security.visibility.CellVisibility; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.ClassSize; import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Performs Append operations on a single row. @@ -44,6 +46,7 @@ import org.apache.yetus.audience.InterfaceAudience; */ @InterfaceAudience.Public public class Append extends Mutation { + private static final Logger LOG = LoggerFactory.getLogger(Append.class); private static final long HEAP_OVERHEAD = ClassSize.REFERENCE + ClassSize.TIMERANGE; private TimeRange tr = new TimeRange(); @@ -176,14 +179,12 @@ public class Append extends Mutation { */ @SuppressWarnings("unchecked") public Append add(final Cell cell) { - // Presume it is KeyValue for now. - byte [] family = CellUtil.cloneFamily(cell); - - // Get cell list for the family - List list = getCellList(family); - - // find where the new entry should be placed in the List - list.add(cell); + try { + super.add(cell); + } catch (IOException e) { + // we eat the exception of wrong row for BC.. + LOG.error(e.toString(), e); + } return this; } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Delete.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Delete.java index 57f5648eaf2..b5a0b93ebb2 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Delete.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Delete.java @@ -25,7 +25,6 @@ import java.util.Map; import java.util.NavigableMap; import java.util.UUID; import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.security.access.Permission; @@ -170,22 +169,12 @@ public class Delete extends Mutation implements Comparable { /** * Add an existing delete marker to this Delete object. - * @param kv An existing KeyValue of type "delete". + * @param cell An existing cell of type "delete". * @return this for invocation chaining * @throws IOException */ - public Delete add(Cell kv) throws IOException { - if (!CellUtil.isDelete(kv)) { - throw new IOException("The recently added KeyValue is not of type " - + "delete. Rowkey: " + Bytes.toStringBinary(this.row)); - } - if (!CellUtil.matchingRows(kv, this.row)) { - throw new WrongRowIOException("The row in " + kv.toString() + - " doesn't match the original one " + Bytes.toStringBinary(this.row)); - } - byte [] family = CellUtil.cloneFamily(kv); - List list = getCellList(family); - list.add(kv); + public Delete add(Cell cell) throws IOException { + super.add(cell); return this; } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Increment.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Increment.java index e9ae8fb2746..1ccc7e99929 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Increment.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Increment.java @@ -98,14 +98,7 @@ public class Increment extends Mutation implements Comparable { * @throws java.io.IOException e */ public Increment add(Cell cell) throws IOException{ - byte [] family = CellUtil.cloneFamily(cell); - List list = getCellList(family); - //Checking that the row of the kv is the same as the put - if (!CellUtil.matchingRows(cell, this.row)) { - throw new WrongRowIOException("The row in " + cell + - " doesn't match the original one " + Bytes.toStringBinary(this.row)); - } - list.add(cell); + super.add(cell); return this; } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Mutation.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Mutation.java index 3983f35ea35..9472d703f88 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Mutation.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Mutation.java @@ -18,24 +18,31 @@ package org.apache.hadoop.hbase.client; +import static org.apache.hadoop.hbase.Tag.TAG_LENGTH_SIZE; + import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.NavigableMap; +import java.util.Optional; import java.util.TreeMap; import java.util.UUID; import java.util.stream.Collectors; +import org.apache.hadoop.hbase.ArrayBackedTag; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellScannable; import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.ExtendedCell; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.PrivateCellUtil; +import org.apache.hadoop.hbase.RawCell; import org.apache.hadoop.hbase.Tag; import org.apache.hadoop.hbase.exceptions.DeserializationException; import org.apache.hadoop.hbase.io.HeapSize; @@ -53,6 +60,7 @@ import org.apache.yetus.audience.InterfaceAudience; import org.apache.hadoop.hbase.shaded.com.google.common.base.Preconditions; import org.apache.hadoop.hbase.shaded.com.google.common.collect.ArrayListMultimap; import org.apache.hadoop.hbase.shaded.com.google.common.collect.ListMultimap; +import org.apache.hadoop.hbase.shaded.com.google.common.collect.Lists; import org.apache.hadoop.hbase.shaded.com.google.common.io.ByteArrayDataInput; import org.apache.hadoop.hbase.shaded.com.google.common.io.ByteArrayDataOutput; import org.apache.hadoop.hbase.shaded.com.google.common.io.ByteStreams; @@ -757,4 +765,204 @@ public abstract class Mutation extends OperationWithAttributes implements Row, C HConstants.MAX_ROW_LENGTH); } } + + Mutation add(Cell cell) throws IOException { + //Checking that the row of the kv is the same as the mutation + // TODO: It is fraught with risk if user pass the wrong row. + // Throwing the IllegalArgumentException is more suitable I'd say. + if (!CellUtil.matchingRows(cell, this.row)) { + throw new WrongRowIOException("The row in " + cell.toString() + + " doesn't match the original one " + Bytes.toStringBinary(this.row)); + } + + if (cell.getFamilyArray() == null || cell.getFamilyLength() == 0) { + throw new IllegalArgumentException("Family cannot be null"); + } + + byte[] family = CellUtil.cloneFamily(cell); + if (cell instanceof ExtendedCell) { + getCellList(family).add(cell); + } else { + getCellList(family).add(new CellWrapper(cell)); + } + return this; + } + + private static final class CellWrapper implements ExtendedCell { + private static final long FIXED_OVERHEAD = ClassSize.align( + ClassSize.OBJECT // object header + + KeyValue.TIMESTAMP_SIZE // timestamp + + Bytes.SIZEOF_LONG // sequence id + + 1 * ClassSize.REFERENCE); // references to cell + private final Cell cell; + private long sequenceId; + private long timestamp; + + CellWrapper(Cell cell) { + assert !(cell instanceof ExtendedCell); + this.cell = cell; + this.sequenceId = cell.getSequenceId(); + this.timestamp = cell.getTimestamp(); + } + + @Override + public void setSequenceId(long seqId) { + sequenceId = seqId; + } + + @Override + public void setTimestamp(long ts) { + timestamp = ts; + } + + @Override + public void setTimestamp(byte[] ts) { + timestamp = Bytes.toLong(ts); + } + + @Override + public long getSequenceId() { + return sequenceId; + } + + @Override + public byte[] getValueArray() { + return cell.getValueArray(); + } + + @Override + public int getValueOffset() { + return cell.getValueOffset(); + } + + @Override + public int getValueLength() { + return cell.getValueLength(); + } + + @Override + public byte[] getTagsArray() { + return cell.getTagsArray(); + } + + @Override + public int getTagsOffset() { + return cell.getTagsOffset(); + } + + @Override + public int getTagsLength() { + return cell.getTagsLength(); + } + + @Override + public byte[] getRowArray() { + return cell.getRowArray(); + } + + @Override + public int getRowOffset() { + return cell.getRowOffset(); + } + + @Override + public short getRowLength() { + return cell.getRowLength(); + } + + @Override + public byte[] getFamilyArray() { + return cell.getFamilyArray(); + } + + @Override + public int getFamilyOffset() { + return cell.getFamilyOffset(); + } + + @Override + public byte getFamilyLength() { + return cell.getFamilyLength(); + } + + @Override + public byte[] getQualifierArray() { + return cell.getQualifierArray(); + } + + @Override + public int getQualifierOffset() { + return cell.getQualifierOffset(); + } + + @Override + public int getQualifierLength() { + return cell.getQualifierLength(); + } + + @Override + public long getTimestamp() { + return timestamp; + } + + @Override + public byte getTypeByte() { + return cell.getTypeByte(); + } + + @Override + public Optional getTag(byte type) { + if (cell instanceof RawCell) { + return ((RawCell) cell).getTag(type); + } + int length = getTagsLength(); + int offset = getTagsOffset(); + int pos = offset; + while (pos < offset + length) { + int tagLen = Bytes.readAsInt(getTagsArray(), pos, TAG_LENGTH_SIZE); + if (getTagsArray()[pos + TAG_LENGTH_SIZE] == type) { + return Optional.of(new ArrayBackedTag(getTagsArray(), pos, + tagLen + TAG_LENGTH_SIZE)); + } + pos += TAG_LENGTH_SIZE + tagLen; + } + return Optional.empty(); + } + + @Override + public List getTags() { + if (cell instanceof RawCell) { + return ((RawCell) cell).getTags(); + } + return Lists.newArrayList(PrivateCellUtil.tagsIterator(cell)); + } + + @Override + public byte[] cloneTags() { + if (cell instanceof RawCell) { + return ((RawCell) cell).cloneTags(); + } else { + return PrivateCellUtil.cloneTags(cell); + } + } + + private long heapOverhead() { + return FIXED_OVERHEAD + + ClassSize.ARRAY // row + + getFamilyLength() == 0 ? 0 : ClassSize.ARRAY + + getQualifierLength() == 0 ? 0 : ClassSize.ARRAY + + getValueLength() == 0 ? 0 : ClassSize.ARRAY + + getTagsLength() == 0 ? 0 : ClassSize.ARRAY; + } + + @Override + public long heapSize() { + return heapOverhead() + + ClassSize.align(getRowLength()) + + ClassSize.align(getFamilyLength()) + + ClassSize.align(getQualifierLength()) + + ClassSize.align(getValueLength()) + + ClassSize.align(getTagsLength()); + } + } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Put.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Put.java index 1a1176ff63f..34ddf08e30d 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Put.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Put.java @@ -26,7 +26,6 @@ import java.util.Map; import java.util.NavigableMap; import java.util.UUID; import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.IndividualBytesFieldCell; import org.apache.hadoop.hbase.KeyValue; @@ -275,30 +274,12 @@ public class Put extends Mutation implements HeapSize, Comparable { * Add the specified KeyValue to this Put operation. Operation assumes that * the passed KeyValue is immutable and its backing array will not be modified * for the duration of this Put. - * @param kv individual KeyValue + * @param cell individual cell * @return this * @throws java.io.IOException e */ - public Put add(Cell kv) throws IOException { - // Family can not be null, otherwise NullPointerException is thrown when putting - // the cell into familyMap - if (kv.getFamilyArray() == null) { - throw new IllegalArgumentException("Family cannot be null"); - } - - // Check timestamp - if (ts < 0) { - throw new IllegalArgumentException("Timestamp cannot be negative. ts=" + ts); - } - - byte [] family = CellUtil.cloneFamily(kv); - List list = getCellList(family); - //Checking that the row of the kv is the same as the put - if (!CellUtil.matchingRows(kv, this.row)) { - throw new WrongRowIOException("The row in " + kv.toString() + - " doesn't match the original one " + Bytes.toStringBinary(this.row)); - } - list.add(kv); + public Put add(Cell cell) throws IOException { + super.add(cell); return this; } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestPassCustomCellViaRegionObserver.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestPassCustomCellViaRegionObserver.java new file mode 100644 index 00000000000..53abfa09bd3 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestPassCustomCellViaRegionObserver.java @@ -0,0 +1,403 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.coprocessor; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import org.apache.hadoop.hbase.CategoryBasedTimeout; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.CompareOperator; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.Append; +import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Durability; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Increment; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.client.TableDescriptorBuilder; +import org.apache.hadoop.hbase.filter.ByteArrayComparable; +import org.apache.hadoop.hbase.testclassification.CoprocessorTests; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.wal.WALEdit; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; +import org.junit.rules.TestRule; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@Category({ CoprocessorTests.class, MediumTests.class }) +public class TestPassCustomCellViaRegionObserver { + private static final Logger LOG = + LoggerFactory.getLogger(TestPassCustomCellViaRegionObserver.class); + + @Rule public final TestRule timeout = CategoryBasedTimeout.builder(). + withTimeout(this.getClass()).withLookingForStuckThread(true).build(); + + @Rule + public TestName testName = new TestName(); + + private TableName tableName; + private Table table = null; + + private static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); + + private static final byte[] ROW = Bytes.toBytes("ROW"); + private static final byte[] FAMILY = Bytes.toBytes("FAMILY"); + private static final byte[] QUALIFIER = Bytes.toBytes("QUALIFIER"); + private static final byte[] VALUE = Bytes.toBytes(10L); + private static final byte[] APPEND_VALUE = Bytes.toBytes("MB"); + + private static final byte[] QUALIFIER_FROM_CP = Bytes.toBytes("QUALIFIER_FROM_CP"); + + @BeforeClass + public static void setupBeforeClass() throws Exception { + // small retry number can speed up the failed tests. + UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2); + UTIL.startMiniCluster(); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + UTIL.shutdownMiniCluster(); + } + + @Before + public void clearTable() throws IOException { + RegionObserverImpl.COUNT.set(0); + tableName = TableName.valueOf(testName.getMethodName()); + if (table != null) { + table.close(); + } + try (Admin admin = UTIL.getAdmin()) { + for (TableName name : admin.listTableNames()) { + try { + admin.disableTable(name); + } catch (IOException e) { + } + admin.deleteTable(name); + } + table = UTIL.createTable(TableDescriptorBuilder.newBuilder(tableName) + .addColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY)) + .addCoprocessor(RegionObserverImpl.class.getName()) + .build(), null); + } + } + + @Test + public void testMutation() throws Exception { + + Put put = new Put(ROW); + put.addColumn(FAMILY, QUALIFIER, VALUE); + table.put(put); + byte[] value = VALUE; + assertResult(table.get(new Get(ROW)), value, value); + assertObserverHasExecuted(); + + Increment inc = new Increment(ROW); + inc.addColumn(FAMILY, QUALIFIER, 10L); + table.increment(inc); + // QUALIFIER -> 10 (put) + 10 (increment) + // QUALIFIER_FROM_CP -> 10 (from cp's put) + 10 (from cp's increment) + value = Bytes.toBytes(20L); + assertResult(table.get(new Get(ROW)), value, value); + assertObserverHasExecuted(); + + Append append = new Append(ROW); + append.addColumn(FAMILY, QUALIFIER, APPEND_VALUE); + table.append(append); + // 10L + "MB" + value = ByteBuffer.wrap(new byte[value.length + APPEND_VALUE.length]) + .put(value) + .put(APPEND_VALUE) + .array(); + assertResult(table.get(new Get(ROW)), value, value); + assertObserverHasExecuted(); + + Delete delete = new Delete(ROW); + delete.addColumns(FAMILY, QUALIFIER); + table.delete(delete); + assertTrue(Arrays.asList(table.get(new Get(ROW)).rawCells()).toString(), + table.get(new Get(ROW)).isEmpty()); + assertObserverHasExecuted(); + + assertTrue(table.checkAndPut(ROW, FAMILY, QUALIFIER, null, put)); + assertObserverHasExecuted(); + + assertTrue(table.checkAndDelete(ROW, FAMILY, QUALIFIER, VALUE, delete)); + assertObserverHasExecuted(); + + assertTrue(table.get(new Get(ROW)).isEmpty()); + } + + @Test + public void testMultiPut() throws Exception { + List puts = IntStream.range(0, 10) + .mapToObj(i -> new Put(ROW).addColumn(FAMILY, Bytes.toBytes(i), VALUE)) + .collect(Collectors.toList()); + table.put(puts); + assertResult(table.get(new Get(ROW)), VALUE); + assertObserverHasExecuted(); + + List deletes = IntStream.range(0, 10) + .mapToObj(i -> new Delete(ROW).addColumn(FAMILY, Bytes.toBytes(i))) + .collect(Collectors.toList()); + table.delete(deletes); + assertTrue(table.get(new Get(ROW)).isEmpty()); + assertObserverHasExecuted(); + } + + private static void assertObserverHasExecuted() { + assertTrue(RegionObserverImpl.COUNT.getAndSet(0) > 0); + } + + private static void assertResult(Result result, byte[] expectedValue) { + assertFalse(result.isEmpty()); + for (Cell c : result.rawCells()) { + assertTrue(c.toString(), Bytes.equals(ROW, CellUtil.cloneRow(c))); + assertTrue(c.toString(), Bytes.equals(FAMILY, CellUtil.cloneFamily(c))); + assertTrue(c.toString(), Bytes.equals(expectedValue, CellUtil.cloneValue(c))); + } + } + + private static void assertResult(Result result, byte[] expectedValue, byte[] expectedFromCp) { + assertFalse(result.isEmpty()); + for (Cell c : result.rawCells()) { + assertTrue(c.toString(), Bytes.equals(ROW, CellUtil.cloneRow(c))); + assertTrue(c.toString(), Bytes.equals(FAMILY, CellUtil.cloneFamily(c))); + if (Bytes.equals(QUALIFIER, CellUtil.cloneQualifier(c))) { + assertTrue(c.toString(), Bytes.equals(expectedValue, CellUtil.cloneValue(c))); + } else if (Bytes.equals(QUALIFIER_FROM_CP, CellUtil.cloneQualifier(c))) { + assertTrue(c.toString(), Bytes.equals(expectedFromCp, CellUtil.cloneValue(c))); + } else { + fail("No valid qualifier"); + } + } + } + + private static Cell createCustomCell(byte[] row, byte[] family, byte[] qualifier, + Cell.DataType type, byte[] value) { + return new Cell() { + + private byte[] getArray(byte[] array) { + return array == null ? HConstants.EMPTY_BYTE_ARRAY : array; + } + + private int length(byte[] array) { + return array == null ? 0 : array.length; + } + + @Override + public byte[] getRowArray() { + return getArray(row); + } + + @Override + public int getRowOffset() { + return 0; + } + + @Override + public short getRowLength() { + return (short) length(row); + } + + @Override + public byte[] getFamilyArray() { + return getArray(family); + } + + @Override + public int getFamilyOffset() { + return 0; + } + + @Override + public byte getFamilyLength() { + return (byte) length(family); + } + + @Override + public byte[] getQualifierArray() { + return getArray(qualifier); + } + + @Override + public int getQualifierOffset() { + return 0; + } + + @Override + public int getQualifierLength() { + return length(qualifier); + } + + @Override + public long getTimestamp() { + return HConstants.LATEST_TIMESTAMP; + } + + @Override + public byte getTypeByte() { + return type.getCode(); + } + + @Override + public long getSequenceId() { + return 0; + } + + @Override + public byte[] getValueArray() { + return getArray(value); + } + + @Override + public int getValueOffset() { + return 0; + } + + @Override + public int getValueLength() { + return length(value); + } + + @Override + public byte[] getTagsArray() { + return getArray(null); + } + + @Override + public int getTagsOffset() { + return 0; + } + + @Override + public int getTagsLength() { + return length(null); + } + + @Override + public DataType getType() { + return type; + } + }; + } + + private static Cell createCustomCell(Put put) { + return createCustomCell(put.getRow(), FAMILY, QUALIFIER_FROM_CP, Cell.DataType.Put, VALUE); + } + + private static Cell createCustomCell(Append append) { + return createCustomCell(append.getRow(), FAMILY, QUALIFIER_FROM_CP, Cell.DataType.Put, + APPEND_VALUE); + } + + private static Cell createCustomCell(Increment inc) { + return createCustomCell(inc.getRow(), FAMILY, QUALIFIER_FROM_CP, Cell.DataType.Put, VALUE); + } + + private static Cell createCustomCell(Delete delete) { + return createCustomCell(delete.getRow(), FAMILY, QUALIFIER_FROM_CP, + Cell.DataType.DeleteColumn, null); + } + + public static class RegionObserverImpl implements RegionCoprocessor, RegionObserver { + static final AtomicInteger COUNT = new AtomicInteger(0); + + @Override + public Optional getRegionObserver() { + return Optional.of(this); + } + + @Override + public void prePut(ObserverContext c, Put put, WALEdit edit, + Durability durability) throws IOException { + put.add(createCustomCell(put)); + COUNT.incrementAndGet(); + } + + @Override + public void preDelete(ObserverContext c, Delete delete, + WALEdit edit, Durability durability) throws IOException { + delete.add(createCustomCell(delete)); + COUNT.incrementAndGet(); + } + + @Override + public boolean preCheckAndPut(ObserverContext c, byte[] row, + byte[] family, byte[] qualifier, CompareOperator op, ByteArrayComparable comparator, Put put, + boolean result) throws IOException { + put.add(createCustomCell(put)); + COUNT.incrementAndGet(); + return result; + } + + @Override + public boolean preCheckAndDelete(ObserverContext c, byte[] row, + byte[] family, byte[] qualifier, CompareOperator op, ByteArrayComparable comparator, + Delete delete, boolean result) throws IOException { + delete.add(createCustomCell(delete)); + COUNT.incrementAndGet(); + return result; + } + + @Override + public Result preAppend(ObserverContext c, Append append) + throws IOException { + append.add(createCustomCell(append)); + COUNT.incrementAndGet(); + return null; + } + + + @Override + public Result preIncrement(ObserverContext c, Increment increment) + throws IOException { + increment.add(createCustomCell(increment)); + COUNT.incrementAndGet(); + return null; + } + + } + +}