HBASE-14882: Provide a Put API that adds the provided family, qualifier, value without copying

Signed-off-by: anoopsamjohn <anoopsamjohn@gmail.com>
This commit is contained in:
Xiang Li 2016-11-08 17:50:11 +08:00 committed by anoopsamjohn
parent 0d6b872d9d
commit 004f0abb46
4 changed files with 538 additions and 2 deletions

View File

@ -33,6 +33,7 @@ import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.IndividualBytesFieldCell;
import org.apache.hadoop.hbase.Tag;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
@ -227,12 +228,18 @@ public class Put extends Mutation implements HeapSize, Comparable<Row> {
* for usage internal HBase to and for advanced client applications.
*/
public Put addImmutable(byte [] family, byte [] qualifier, long ts, byte [] value) {
// Family can not be null, otherwise NullPointerException is thrown when putting the cell into familyMap
if (family == null) {
throw new IllegalArgumentException("Family cannot be null");
}
// Check timestamp
if (ts < 0) {
throw new IllegalArgumentException("Timestamp cannot be negative. ts=" + ts);
}
List<Cell> list = getCellList(family);
KeyValue kv = createPutKeyValue(family, qualifier, ts, value);
list.add(kv);
list.add(new IndividualBytesFieldCell(this.row, family, qualifier, ts, KeyValue.Type.Put, value));
familyMap.put(family, list);
return this;
}

View File

@ -22,6 +22,8 @@ package org.apache.hadoop.hbase.client;
import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.Cell;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@ -61,4 +63,44 @@ public class TestPut {
Put putRowIsNotImmutable = new Put(rowKey, 1000L, false);
assertTrue(rowKey != putRowIsNotImmutable.getRow()); // A local copy is made
}
// HBASE-14882
@Test
public void testAddImmutable() {
byte[] row = Bytes.toBytes("immutable-row");
byte[] family = Bytes.toBytes("immutable-family");
byte[] qualifier0 = Bytes.toBytes("immutable-qualifier-0");
byte[] value0 = Bytes.toBytes("immutable-value-0");
byte[] qualifier1 = Bytes.toBytes("immutable-qualifier-1");
byte[] value1 = Bytes.toBytes("immutable-value-1");
long ts1 = 5000L;
Put put = new Put(row, true); // "true" indicates that the input row is immutable
put.addImmutable(family, qualifier0, value0);
put.addImmutable(family, qualifier1, ts1, value1);
// Verify the cell of family:qualifier0
Cell cell0 = put.get(family, qualifier0).get(0);
// Verify no local copy is made for family, qualifier or value
assertTrue(cell0.getFamilyArray() == family);
assertTrue(cell0.getQualifierArray() == qualifier0);
assertTrue(cell0.getValueArray() == value0);
// Verify timestamp
assertTrue(cell0.getTimestamp() == put.getTimeStamp());
// Verify the cell of family:qualifier1
Cell cell1 = put.get(family, qualifier1).get(0);
// Verify no local copy is made for family, qualifier or value
assertTrue(cell1.getFamilyArray() == family);
assertTrue(cell1.getQualifierArray() == qualifier1);
assertTrue(cell1.getValueArray() == value1);
// Verify timestamp
assertTrue(cell1.getTimestamp() == ts1);
}
}

View File

@ -0,0 +1,302 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ClassSize;
import org.apache.hadoop.hbase.util.ByteBufferUtils;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
@InterfaceAudience.Private
public class IndividualBytesFieldCell implements ExtendedCell {
private static final long FIXED_OVERHEAD = ClassSize.align( // do alignment(padding gap)
ClassSize.OBJECT // object header
+ KeyValue.TIMESTAMP_TYPE_SIZE // timestamp and type
+ Bytes.SIZEOF_LONG // sequence id
+ 5 * ClassSize.REFERENCE); // references to all byte arrays: row, family, qualifier, value, tags
// The following fields are backed by individual byte arrays
private byte[] row;
private byte[] family;
private byte[] qualifier;
private byte[] value;
private byte[] tags; // A byte array, rather than an array of org.apache.hadoop.hbase.Tag
// Other fields
private long timestamp;
private byte type; // A byte, rather than org.apache.hadoop.hbase.KeyValue.Type
private long seqId;
public IndividualBytesFieldCell(byte[] row, byte[] family, byte[] qualifier,
long timestamp, KeyValue.Type type, byte[] value) {
this(row, family, qualifier, timestamp, type, 0L /* sequence id */, value, null /* tags */);
}
public IndividualBytesFieldCell(byte[] row, byte[] family, byte[] qualifier,
long timestamp, KeyValue.Type type, long seqId, byte[] value, byte[] tags) {
// Check row, family, qualifier and value
KeyValue.checkParameters(row, (row == null) ? 0 : row.length, // row and row length
family, (family == null) ? 0 : family.length, // family and family length
(qualifier == null) ? 0 : qualifier.length, // qualifier length
(value == null) ? 0 : value.length); // value length
// Check timestamp
if (timestamp < 0) {
throw new IllegalArgumentException("Timestamp cannot be negative. ts=" + timestamp);
}
// Check tags
TagUtil.checkForTagsLength((tags == null) ? 0 : tags.length);
// No local copy is made, but reference to the input directly
this.row = row;
this.family = family;
this.qualifier = qualifier;
this.value = value;
this.tags = tags;
// Set others
this.timestamp = timestamp;
this.type = type.getCode();
this.seqId = seqId;
}
@Override
public int write(OutputStream out, boolean withTags) throws IOException {
// Key length and then value length
ByteBufferUtils.putInt(out, KeyValueUtil.keyLength(this));
ByteBufferUtils.putInt(out, getValueLength());
// Key
CellUtil.writeFlatKey(this, out);
// Value
out.write(getValueArray());
// Tags length and tags byte array
if (withTags && getTagsLength() > 0) {
// Tags length
out.write((byte)(0xff & (tags.length >> 8)));
out.write((byte)(0xff & tags.length));
// Tags byte array
out.write(tags);
}
return getSerializedSize(withTags);
}
@Override
public void write(ByteBuffer buf, int offset) {
KeyValueUtil.appendTo(this, buf, offset, true);
}
@Override
public int getSerializedSize(boolean withTags) {
return KeyValueUtil.length(getRowLength(), getFamilyLength(), getQualifierLength(),
getValueLength(), getTagsLength(), withTags);
}
@Override
public long heapOverhead() {
return FIXED_OVERHEAD
+ ClassSize.ARRAY // row , can not be null
+ ((family == null) ? 0 : ClassSize.ARRAY) // family , can be null
+ ((qualifier == null) ? 0 : ClassSize.ARRAY) // qualifier, can be null
+ ((value == null) ? 0 : ClassSize.ARRAY) // value , can be null
+ ((tags == null) ? 0 : ClassSize.ARRAY); // tags , can be null
}
@Override
public Cell deepClone() {
// When being added to the memstore, deepClone() is called and KeyValue has less heap overhead.
return new KeyValue(this);
}
/**
* Implement Cell interface
*/
// 1) Row
@Override
public byte[] getRowArray() {
// If row is null, the constructor will reject it, by {@link KeyValue#checkParameters()},
// so it is safe to return row without checking.
return row;
}
@Override
public int getRowOffset() {
return 0;
}
@Override
public short getRowLength() {
// If row is null or row.length is invalid, the constructor will reject it, by {@link KeyValue#checkParameters()},
// so it is safe to call row.length and make the type conversion.
return (short)(row.length);
}
// 2) Family
@Override
public byte[] getFamilyArray() {
// Family could be null
return (family == null) ? HConstants.EMPTY_BYTE_ARRAY : family;
}
@Override
public int getFamilyOffset() {
return 0;
}
@Override
public byte getFamilyLength() {
// If family.length is invalid, the constructor will reject it, by {@link KeyValue#checkParameters()},
// so it is safe to make the type conversion.
// But need to consider the condition when family is null.
return (family == null) ? 0 : (byte)(family.length);
}
// 3) Qualifier
@Override
public byte[] getQualifierArray() {
// Qualifier could be null
return (qualifier == null) ? HConstants.EMPTY_BYTE_ARRAY : qualifier;
}
@Override
public int getQualifierOffset() {
return 0;
}
@Override
public int getQualifierLength() {
// Qualifier could be null
return (qualifier == null) ? 0 : qualifier.length;
}
// 4) Timestamp
@Override
public long getTimestamp() {
return timestamp;
}
//5) Type
@Override
public byte getTypeByte() {
return type;
}
//6) Sequence id
@Override
public long getSequenceId() {
return seqId;
}
//7) Value
@Override
public byte[] getValueArray() {
// Value could be null
return (value == null) ? HConstants.EMPTY_BYTE_ARRAY : value;
}
@Override
public int getValueOffset() {
return 0;
}
@Override
public int getValueLength() {
// Value could be null
return (value == null) ? 0 : value.length;
}
// 8) Tags
@Override
public byte[] getTagsArray() {
// Tags can could null
return (tags == null) ? HConstants.EMPTY_BYTE_ARRAY : tags;
}
@Override
public int getTagsOffset() {
return 0;
}
@Override
public int getTagsLength() {
// Tags could be null
return (tags == null) ? 0 : tags.length;
}
/**
* Implement HeapSize interface
*/
@Override
public long heapSize() {
// Size of array headers are already included into overhead, so do not need to include it for each byte array
return heapOverhead() // overhead, with array headers included
+ ClassSize.align(getRowLength()) // row
+ ClassSize.align(getFamilyLength()) // family
+ ClassSize.align(getQualifierLength()) // qualifier
+ ClassSize.align(getValueLength()) // value
+ ClassSize.align(getTagsLength()); // tags
}
/**
* Implement Cloneable interface
*/
@Override
public Object clone() throws CloneNotSupportedException {
return super.clone(); // only a shadow copy
}
/**
* Implement SettableSequenceId interface
*/
@Override
public void setSequenceId(long seqId) {
if (seqId < 0) {
throw new IllegalArgumentException("Sequence Id cannot be negative. ts=" + seqId);
}
this.seqId = seqId;
}
/**
* Implement SettableTimestamp interface
*/
@Override
public void setTimestamp(long ts) {
if (ts < 0) {
throw new IllegalArgumentException("Timestamp cannot be negative. ts=" + ts);
}
this.timestamp = ts;
}
@Override
public void setTimestamp(byte[] ts, int tsOffset) {
setTimestamp(Bytes.toLong(ts, tsOffset));
}
}

View File

@ -0,0 +1,185 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase;
import java.io.IOException;
import java.nio.ByteBuffer;
import org.apache.hadoop.hbase.io.ByteArrayOutputStream;
import org.apache.hadoop.hbase.util.Bytes;
import static org.apache.hadoop.hbase.KeyValue.Type;
import org.apache.hadoop.hbase.testclassification.MiscTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertEquals;
@Category({MiscTests.class, SmallTests.class})
public class TestIndividualBytesFieldCell {
private static IndividualBytesFieldCell ic0 = null;
private static KeyValue kv0 = null;
@BeforeClass
public static void testConstructorAndVerify() {
// Immutable inputs
byte[] row = Bytes.toBytes("immutable-row");
byte[] family = Bytes.toBytes("immutable-family");
byte[] qualifier = Bytes.toBytes("immutable-qualifier");
byte[] value = Bytes.toBytes("immutable-value");
byte[] tags = Bytes.toBytes("immutable-tags");
// Other inputs
long timestamp = 5000L;
long seqId = 0L;
Type type = KeyValue.Type.Put;
ic0 = new IndividualBytesFieldCell(row, family, qualifier, timestamp, type, seqId, value, tags);
kv0 = new KeyValue(row, family, qualifier, timestamp, type, value, tags);
// Verify if no local copy is made for row, family, qualifier, value or tags.
assertTrue(ic0.getRowArray() == row);
assertTrue(ic0.getFamilyArray() == family);
assertTrue(ic0.getQualifierArray() == qualifier);
assertTrue(ic0.getValueArray() == value);
assertTrue(ic0.getTagsArray() == tags);
// Verify others.
assertEquals(timestamp , ic0.getTimestamp());
assertEquals(seqId , ic0.getSequenceId());
assertEquals(type.getCode(), ic0.getTypeByte());
// Verify offsets of backing byte arrays are always 0.
assertEquals(0, ic0.getRowOffset());
assertEquals(0, ic0.getFamilyOffset());
assertEquals(0, ic0.getQualifierOffset());
assertEquals(0, ic0.getValueOffset());
assertEquals(0, ic0.getTagsOffset());
}
// Verify clone() and deepClone()
@Test
public void testClone() throws CloneNotSupportedException {
// Verify clone. Only shadow copies are made for backing byte arrays.
IndividualBytesFieldCell cloned = (IndividualBytesFieldCell) ic0.clone();
assertTrue(cloned.getRowArray() == ic0.getRowArray());
assertTrue(cloned.getFamilyArray() == ic0.getFamilyArray());
assertTrue(cloned.getQualifierArray() == ic0.getQualifierArray());
assertTrue(cloned.getValueArray() == ic0.getValueArray());
assertTrue(cloned.getTagsArray() == ic0.getTagsArray());
// Verify if deep clone returns a KeyValue object
assertTrue(ic0.deepClone() instanceof KeyValue);
}
/**
* Verify KeyValue format related functions: write() and getSerializedSize().
* Should have the same behaviors as {@link KeyValue}.
*/
@Test
public void testWriteIntoKeyValueFormat() throws IOException {
// Verify getSerializedSize().
assertEquals(kv0.getSerializedSize(true), ic0.getSerializedSize(true)); // with tags
assertEquals(kv0.getSerializedSize(false), ic0.getSerializedSize(false)); // without tags
// Verify writing into ByteBuffer.
ByteBuffer bbufIC = ByteBuffer.allocate(ic0.getSerializedSize(true));
ic0.write(bbufIC, 0);
ByteBuffer bbufKV = ByteBuffer.allocate(kv0.getSerializedSize(true));
kv0.write(bbufKV, 0);
assertTrue(bbufIC.equals(bbufKV));
// Verify writing into OutputStream.
testWriteIntoOutputStream(ic0, kv0, true); // with tags
testWriteIntoOutputStream(ic0, kv0, false); // without tags
}
/**
* @param ic An instance of IndividualBytesFieldCell to compare.
* @param kv An instance of KeyValue to compare.
* @param withTags Whether to write tags.
* @throws IOException
*/
private void testWriteIntoOutputStream(IndividualBytesFieldCell ic, KeyValue kv, boolean withTags)
throws IOException {
ByteArrayOutputStream outIC = new ByteArrayOutputStream(ic.getSerializedSize(withTags));
ByteArrayOutputStream outKV = new ByteArrayOutputStream(kv.getSerializedSize(withTags));
assertEquals(kv.write(outKV, withTags), ic.write(outIC, withTags)); // compare the number of bytes written
assertArrayEquals(outKV.getBuffer(), outIC.getBuffer()); // compare the underlying byte array
}
/**
* Verify getXXXArray() and getXXXLength() when family/qualifier/value/tags are null.
* Should have the same behaviors as {@link KeyValue}.
*/
@Test
public void testNullFamilyQualifierValueTags() {
byte[] row = Bytes.toBytes("row1");
long timestamp = 5000L;
long seqId = 0L;
Type type = KeyValue.Type.Put;
// Test when following fields are null.
byte[] family = null;
byte[] qualifier = null;
byte[] value = null;
byte[] tags = null;
Cell ic1 = new IndividualBytesFieldCell(row, family, qualifier, timestamp, type, seqId, value, tags);
Cell kv1 = new KeyValue(row, family, qualifier, timestamp, type, value, tags);
byte[] familyArrayInKV = Bytes.copy(kv1.getFamilyArray() , kv1.getFamilyOffset() , kv1.getFamilyLength());
byte[] qualifierArrayInKV = Bytes.copy(kv1.getQualifierArray(), kv1.getQualifierOffset(), kv1.getQualifierLength());
byte[] valueArrayInKV = Bytes.copy(kv1.getValueArray() , kv1.getValueOffset() , kv1.getValueLength());
byte[] tagsArrayInKV = Bytes.copy(kv1.getTagsArray() , kv1.getTagsOffset() , kv1.getTagsLength());
// getXXXArray() for family, qualifier, value and tags are supposed to return empty byte array, rather than null.
assertArrayEquals(familyArrayInKV , ic1.getFamilyArray());
assertArrayEquals(qualifierArrayInKV, ic1.getQualifierArray());
assertArrayEquals(valueArrayInKV , ic1.getValueArray());
assertArrayEquals(tagsArrayInKV , ic1.getTagsArray());
// getXXXLength() for family, qualifier, value and tags are supposed to return 0.
assertEquals(kv1.getFamilyLength() , ic1.getFamilyLength());
assertEquals(kv1.getQualifierLength(), ic1.getQualifierLength());
assertEquals(kv1.getValueLength() , ic1.getValueLength());
assertEquals(kv1.getTagsLength() , ic1.getTagsLength());
}
// Verify if SettableSequenceId interface is implemented
@Test
public void testIfSettableSequenceIdImplemented() {
assertTrue(ic0 instanceof SettableSequenceId);
}
// Verify if SettableTimestamp interface is implemented
@Test
public void testIfSettableTimestampImplemented() {
assertTrue(ic0 instanceof SettableTimestamp);
}
}