HBASE-9874 Append and Increment operation drops Tags

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1539224 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
anoopsamjohn 2013-11-06 03:11:58 +00:00
parent 18ecb48662
commit b4921b03b0
8 changed files with 232 additions and 15 deletions

View File

@ -627,7 +627,9 @@ public final class ProtobufUtil {
"Missing required field: qualifer value");
}
byte[] value = qv.getValue().toByteArray();
append.add(family, qualifier, value);
byte[] tags = qv.getTags().toByteArray();
append.add(CellUtil.createCell(row, family, qualifier, append.getTimeStamp(),
KeyValue.Type.Put, value, tags));
}
}
}
@ -699,8 +701,10 @@ public final class ProtobufUtil {
if (!qv.hasValue()) {
throw new DoNotRetryIOException("Missing required field: qualifer value");
}
long value = Bytes.toLong(qv.getValue().toByteArray());
increment.addColumn(family, qualifier, value);
byte[] value = qv.getValue().toByteArray();
byte[] tags = qv.getTags().toByteArray();
increment.add(CellUtil.createCell(row, family, qualifier, increment.getTimeStamp(),
KeyValue.Type.Put, value, tags));
}
}
}
@ -973,6 +977,10 @@ public final class ProtobufUtil {
kv.getQualifierArray(), kv.getQualifierOffset(), kv.getQualifierLength()));
valueBuilder.setValue(ZeroCopyLiteralByteString.wrap(
kv.getValueArray(), kv.getValueOffset(), kv.getValueLength()));
if (kv.getTagsLength() > 0) {
valueBuilder.setTags(ZeroCopyLiteralByteString.wrap(kv.getTagsArray(),
kv.getTagsOffset(), kv.getTagsLength()));
}
columnBuilder.addQualifierValue(valueBuilder.build());
}
}

View File

@ -172,6 +172,12 @@ public final class CellUtil {
return keyValue;
}
public static Cell createCell(final byte[] row, final byte[] family, final byte[] qualifier,
final long timestamp, Type type, final byte[] value, byte[] tags) {
KeyValue keyValue = new KeyValue(row, family, qualifier, timestamp, type, value, tags);
return keyValue;
}
/**
* @param cellScannerables
* @return CellScanner interface over <code>cellIterables</code>

View File

@ -809,7 +809,7 @@ public class KeyValue implements Cell, HeapSize, Cloneable {
pos += flength + qlength;
pos = Bytes.putLong(bytes, pos, timestamp);
pos = Bytes.putByte(bytes, pos, type.getCode());
pos += keylength + vlength;
pos += vlength;
if (tagsLength > 0) {
pos = Bytes.putShort(bytes, pos, (short)(tagsLength & 0x0000ffff));
}

View File

@ -477,4 +477,10 @@ public abstract class BaseRegionObserver implements RegionObserver {
DataBlockEncoding preferredEncodingInCache, Reference r, Reader reader) throws IOException {
return reader;
}
@Override
public Cell postMutationBeforeWAL(ObserverContext<RegionCoprocessorEnvironment> ctx,
MutationType opType, Mutation mutation, Cell oldCell, Cell newCell) throws IOException {
return newCell;
}
}

View File

@ -67,6 +67,11 @@ import org.apache.hadoop.hbase.util.Pair;
@InterfaceStability.Evolving
public interface RegionObserver extends Coprocessor {
/** Mutation type for postMutationBeforeWAL hook */
public enum MutationType {
APPEND, INCREMENT
}
/**
* Called before the region is reported as open to the master.
* @param c the environment provided by the region server
@ -1052,4 +1057,20 @@ public interface RegionObserver extends Coprocessor {
final FileSystem fs, final Path p, final FSDataInputStreamWrapper in, long size,
final CacheConfig cacheConf, final DataBlockEncoding preferredEncodingInCache,
final Reference r, StoreFile.Reader reader) throws IOException;
/**
* Called after a new cell has been created during an increment operation, but before
* it is committed to the WAL or memstore.
* Calling {@link org.apache.hadoop.hbase.coprocessor.ObserverContext#bypass()} has no
* effect in this hook.
* @param ctx the environment provided by the region server
* @param opType the operation type
* @param mutation the current mutation
* @param oldCell old cell containing previous value
* @param newCell the new cell containing the computed value
* @return the new cell, possibly changed
* @throws IOException
*/
Cell postMutationBeforeWAL(ObserverContext<RegionCoprocessorEnvironment> ctx,
MutationType opType, Mutation mutation, Cell oldCell, Cell newCell) throws IOException;
}

View File

@ -94,6 +94,7 @@ import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.RowMutations;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.coprocessor.RegionObserver;
import org.apache.hadoop.hbase.errorhandling.ForeignExceptionSnare;
import org.apache.hadoop.hbase.exceptions.FailedSanityCheckException;
import org.apache.hadoop.hbase.exceptions.RegionInRecoveryException;
@ -4667,13 +4668,15 @@ public class HRegion implements HeapSize { // , Writable{
for (Cell cell : family.getValue()) {
KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
KeyValue newKV;
KeyValue oldKv = null;
if (idx < results.size()
&& CellUtil.matchingQualifier(results.get(idx),kv)) {
KeyValue oldKv = KeyValueUtil.ensureKeyValue(results.get(idx));
oldKv = KeyValueUtil.ensureKeyValue(results.get(idx));
// allocate an empty kv once
newKV = new KeyValue(row.length, kv.getFamilyLength(),
kv.getQualifierLength(), now, KeyValue.Type.Put,
oldKv.getValueLength() + kv.getValueLength());
oldKv.getValueLength() + kv.getValueLength(),
oldKv.getTagsLength() + kv.getTagsLength());
// copy in the value
System.arraycopy(oldKv.getBuffer(), oldKv.getValueOffset(),
newKV.getBuffer(), newKV.getValueOffset(),
@ -4682,16 +4685,24 @@ public class HRegion implements HeapSize { // , Writable{
newKV.getBuffer(),
newKV.getValueOffset() + oldKv.getValueLength(),
kv.getValueLength());
// copy in the tags
System.arraycopy(oldKv.getBuffer(), oldKv.getTagsOffset(), newKV.getBuffer(),
newKV.getTagsOffset(), oldKv.getTagsLength());
System.arraycopy(kv.getBuffer(), kv.getTagsOffset(), newKV.getBuffer(),
newKV.getTagsOffset() + oldKv.getTagsLength(), kv.getTagsLength());
idx++;
} else {
// allocate an empty kv once
newKV = new KeyValue(row.length, kv.getFamilyLength(),
kv.getQualifierLength(), now, KeyValue.Type.Put,
kv.getValueLength());
kv.getValueLength(), kv.getTagsLength());
// copy in the value
System.arraycopy(kv.getBuffer(), kv.getValueOffset(),
newKV.getBuffer(), newKV.getValueOffset(),
kv.getValueLength());
// copy in tags
System.arraycopy(kv.getBuffer(), kv.getTagsOffset(), newKV.getBuffer(),
newKV.getTagsOffset(), kv.getTagsLength());
}
// copy in row, family, and qualifier
System.arraycopy(kv.getBuffer(), kv.getRowOffset(),
@ -4704,6 +4715,11 @@ public class HRegion implements HeapSize { // , Writable{
kv.getQualifierLength());
newKV.setMvccVersion(w.getWriteNumber());
// Give coprocessors a chance to update the new cell
if (coprocessorHost != null) {
newKV = KeyValueUtil.ensureKeyValue(coprocessorHost.postMutationBeforeWAL(
RegionObserver.MutationType.APPEND, append, oldKv, (Cell) newKV));
}
kvs.add(newKV);
// Append update to WAL
@ -4837,8 +4853,9 @@ public class HRegion implements HeapSize { // , Writable{
int idx = 0;
for (Cell kv: family.getValue()) {
long amount = Bytes.toLong(CellUtil.cloneValue(kv));
Cell c = null;
if (idx < results.size() && CellUtil.matchingQualifier(results.get(idx), kv)) {
Cell c = results.get(idx);
c = results.get(idx);
if(c.getValueLength() == Bytes.SIZEOF_LONG) {
amount += Bytes.toLong(c.getValueArray(), c.getValueOffset(), Bytes.SIZEOF_LONG);
} else {
@ -4850,9 +4867,33 @@ public class HRegion implements HeapSize { // , Writable{
}
// Append new incremented KeyValue to list
KeyValue newKV =
new KeyValue(row, family.getKey(), CellUtil.cloneQualifier(kv), now, Bytes.toBytes(amount));
byte[] q = CellUtil.cloneQualifier(kv);
byte[] val = Bytes.toBytes(amount);
int oldCellTagsLen = (c == null) ? 0 : c.getTagsLength();
int incCellTagsLen = kv.getTagsLength();
KeyValue newKV = new KeyValue(row.length, family.getKey().length, q.length, now,
KeyValue.Type.Put, val.length, oldCellTagsLen + incCellTagsLen);
System.arraycopy(row, 0, newKV.getBuffer(), newKV.getRowOffset(), row.length);
System.arraycopy(family.getKey(), 0, newKV.getBuffer(), newKV.getFamilyOffset(),
family.getKey().length);
System.arraycopy(q, 0, newKV.getBuffer(), newKV.getQualifierOffset(), q.length);
// copy in the value
System.arraycopy(val, 0, newKV.getBuffer(), newKV.getValueOffset(), val.length);
// copy tags
if (oldCellTagsLen > 0) {
System.arraycopy(c.getTagsArray(), c.getTagsOffset(), newKV.getBuffer(),
newKV.getTagsOffset(), oldCellTagsLen);
}
if (incCellTagsLen > 0) {
System.arraycopy(kv.getTagsArray(), kv.getTagsOffset(), newKV.getBuffer(),
newKV.getTagsOffset() + oldCellTagsLen, incCellTagsLen);
}
newKV.setMvccVersion(w.getWriteNumber());
// Give coprocessors a chance to update the new cell
if (coprocessorHost != null) {
newKV = KeyValueUtil.ensureKeyValue(coprocessorHost.postMutationBeforeWAL(
RegionObserver.MutationType.INCREMENT, increment, c, (Cell) newKV));
}
kvs.add(newKV);
// Prepare WAL updates

View File

@ -55,6 +55,7 @@ import org.apache.hadoop.hbase.coprocessor.CoprocessorService;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.coprocessor.RegionObserver;
import org.apache.hadoop.hbase.coprocessor.RegionObserver.MutationType;
import org.apache.hadoop.hbase.filter.ByteArrayComparable;
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
@ -1708,4 +1709,24 @@ public class RegionCoprocessorHost
}
return reader;
}
public Cell postMutationBeforeWAL(MutationType opType, Mutation mutation, Cell oldCell,
Cell newCell) throws IOException {
ObserverContext<RegionCoprocessorEnvironment> ctx = null;
for (RegionEnvironment env : coprocessors) {
if (env.getInstance() instanceof RegionObserver) {
ctx = ObserverContext.createAndPrepare(env, ctx);
try {
newCell = ((RegionObserver) env.getInstance()).postMutationBeforeWAL(ctx, opType,
mutation, oldCell, newCell);
} catch (Throwable e) {
handleCoprocessorThrowable(env, e);
}
if (ctx.shouldComplete()) {
break;
}
}
}
return newCell;
}
}

View File

@ -17,13 +17,12 @@
*/
package org.apache.hadoop.hbase.regionserver;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.NavigableMap;
import java.util.TreeMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
@ -37,9 +36,11 @@ import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.MediumTests;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.Tag;
import org.apache.hadoop.hbase.client.Append;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Increment;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
@ -55,8 +56,10 @@ import org.apache.hadoop.hbase.util.Bytes;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
/**
* Class that test tags
@ -67,6 +70,9 @@ public class TestTags {
private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
@Rule
public final TestName TEST_NAME = new TestName();
@BeforeClass
public static void setUpBeforeClass() throws Exception {
Configuration conf = TEST_UTIL.getConfiguration();
@ -90,7 +96,7 @@ public class TestTags {
public void testTags() throws Exception {
HTable table = null;
try {
TableName tableName = TableName.valueOf("testTags");
TableName tableName = TableName.valueOf(TEST_NAME.getMethodName());
byte[] fam = Bytes.toBytes("info");
byte[] row = Bytes.toBytes("rowa");
// column names
@ -168,7 +174,7 @@ public class TestTags {
public void testFlushAndCompactionWithoutTags() throws Exception {
HTable table = null;
try {
TableName tableName = TableName.valueOf("testFlushAndCompactionWithoutTags");
TableName tableName = TableName.valueOf(TEST_NAME.getMethodName());
byte[] fam = Bytes.toBytes("info");
byte[] row = Bytes.toBytes("rowa");
// column names
@ -270,7 +276,7 @@ public class TestTags {
public void testFlushAndCompactionwithCombinations() throws Exception {
HTable table = null;
try {
TableName tableName = TableName.valueOf("testFlushAndCompactionwithCombinations");
TableName tableName = TableName.valueOf(TEST_NAME.getMethodName());
byte[] fam = Bytes.toBytes("info");
byte[] row = Bytes.toBytes("rowa");
// column names
@ -394,6 +400,114 @@ public class TestTags {
}
}
@Test
public void testTagsWithAppendAndIncrement() throws Exception {
TableName tableName = TableName.valueOf(TEST_NAME.getMethodName());
byte[] f = Bytes.toBytes("f");
byte[] q = Bytes.toBytes("q");
byte[] row1 = Bytes.toBytes("r1");
byte[] row2 = Bytes.toBytes("r2");
HTableDescriptor desc = new HTableDescriptor(tableName);
HColumnDescriptor colDesc = new HColumnDescriptor(f);
desc.addFamily(colDesc);
TEST_UTIL.getHBaseAdmin().createTable(desc);
HTable table = null;
try {
table = new HTable(TEST_UTIL.getConfiguration(), tableName);
Put put = new Put(row1);
byte[] v = Bytes.toBytes(2L);
put.add(f, q, v, new Tag[] { new Tag((byte) 1, "tag1") });
table.put(put);
Increment increment = new Increment(row1);
increment.addColumn(f, q, 1L);
table.increment(increment);
ResultScanner scanner = table.getScanner(new Scan());
Result result = scanner.next();
KeyValue kv = KeyValueUtil.ensureKeyValue(result.getColumnLatestCell(f, q));
List<Tag> tags = kv.getTags();
assertEquals(3L, Bytes.toLong(kv.getValueArray(), kv.getValueOffset(), kv.getValueLength()));
assertEquals(1, tags.size());
assertEquals("tag1", Bytes.toString(tags.get(0).getValue()));
increment = new Increment(row1);
increment.add(new KeyValue(row1, f, q, 1234L, v, new Tag[] { new Tag((byte) 1, "tag2") }));
table.increment(increment);
scanner = table.getScanner(new Scan());
result = scanner.next();
kv = KeyValueUtil.ensureKeyValue(result.getColumnLatestCell(f, q));
tags = kv.getTags();
assertEquals(5L, Bytes.toLong(kv.getValueArray(), kv.getValueOffset(), kv.getValueLength()));
assertEquals(2, tags.size());
assertEquals("tag1", Bytes.toString(tags.get(0).getValue()));
assertEquals("tag2", Bytes.toString(tags.get(1).getValue()));
put = new Put(row2);
v = Bytes.toBytes(2L);
put.add(f, q, v);
table.put(put);
increment = new Increment(row2);
increment.add(new KeyValue(row2, f, q, 1234L, v, new Tag[] { new Tag((byte) 1, "tag2") }));
table.increment(increment);
Scan scan = new Scan();
scan.setStartRow(row2);
scanner = table.getScanner(scan);
result = scanner.next();
kv = KeyValueUtil.ensureKeyValue(result.getColumnLatestCell(f, q));
tags = kv.getTags();
assertEquals(4L, Bytes.toLong(kv.getValueArray(), kv.getValueOffset(), kv.getValueLength()));
assertEquals(1, tags.size());
assertEquals("tag2", Bytes.toString(tags.get(0).getValue()));
// Test Append
byte[] row3 = Bytes.toBytes("r3");
put = new Put(row3);
put.add(f, q, Bytes.toBytes("a"), new Tag[] { new Tag((byte) 1, "tag1") });
table.put(put);
Append append = new Append(row3);
append.add(f, q, Bytes.toBytes("b"));
table.append(append);
scan = new Scan();
scan.setStartRow(row3);
scanner = table.getScanner(scan);
result = scanner.next();
kv = KeyValueUtil.ensureKeyValue(result.getColumnLatestCell(f, q));
tags = kv.getTags();
assertEquals(1, tags.size());
assertEquals("tag1", Bytes.toString(tags.get(0).getValue()));
append = new Append(row3);
append.add(new KeyValue(row3, f, q, 1234L, v, new Tag[] { new Tag((byte) 1, "tag2") }));
table.append(append);
scanner = table.getScanner(scan);
result = scanner.next();
kv = KeyValueUtil.ensureKeyValue(result.getColumnLatestCell(f, q));
tags = kv.getTags();
assertEquals(2, tags.size());
assertEquals("tag1", Bytes.toString(tags.get(0).getValue()));
assertEquals("tag2", Bytes.toString(tags.get(1).getValue()));
byte[] row4 = Bytes.toBytes("r4");
put = new Put(row4);
put.add(f, q, Bytes.toBytes("a"));
table.put(put);
append = new Append(row4);
append.add(new KeyValue(row4, f, q, 1234L, v, new Tag[] { new Tag((byte) 1, "tag2") }));
table.append(append);
scan = new Scan();
scan.setStartRow(row4);
scanner = table.getScanner(scan);
result = scanner.next();
kv = KeyValueUtil.ensureKeyValue(result.getColumnLatestCell(f, q));
tags = kv.getTags();
assertEquals(1, tags.size());
assertEquals("tag2", Bytes.toString(tags.get(0).getValue()));
} finally {
if (table != null) {
table.close();
}
}
}
private void result(byte[] fam, byte[] row, byte[] qual, byte[] row2, HTable table, byte[] value,
byte[] value2, byte[] row1, byte[] value1) throws IOException {
Scan s = new Scan(row);