HBASE-16134 Introduce Cell extension for server side.

This commit is contained in:
anoopsamjohn 2016-09-27 22:55:45 +05:30
parent b0fcca6d7b
commit b644e0fb8d
15 changed files with 210 additions and 122 deletions

View File

@ -23,6 +23,7 @@ import static org.apache.hadoop.hbase.Tag.TAG_LENGTH_SIZE;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.math.BigDecimal;
import java.nio.ByteBuffer;
import java.util.ArrayList;
@ -377,8 +378,7 @@ public final class CellUtil {
* parts, refer to the original Cell.
*/
@InterfaceAudience.Private
private static class TagRewriteCell implements Cell, SettableSequenceId, SettableTimestamp,
HeapSize {
private static class TagRewriteCell implements ExtendedCell {
protected Cell cell;
protected byte[] tags;
@ -387,8 +387,7 @@ public final class CellUtil {
* @param tags the tags bytes. The array suppose to contain the tags bytes alone.
*/
public TagRewriteCell(Cell cell, byte[] tags) {
assert cell instanceof SettableSequenceId;
assert cell instanceof SettableTimestamp;
assert cell instanceof ExtendedCell;
assert tags != null;
this.cell = cell;
this.tags = tags;
@ -522,6 +521,28 @@ public final class CellUtil {
// The incoming cell is supposed to be SettableSequenceId type.
CellUtil.setSequenceId(cell, seqId);
}
@Override
public int write(OutputStream out, boolean withTags) throws IOException {
int len = ((ExtendedCell) this.cell).write(out, false);
if (withTags && this.tags != null) {
// Write the tagsLength 2 bytes
out.write((byte) (0xff & (this.tags.length >> 8)));
out.write((byte) (0xff & this.tags.length));
out.write(this.tags);
len += KeyValue.TAGS_LENGTH_SIZE + this.tags.length;
}
return len;
}
@Override
public int getSerializedSize(boolean withTags) {
int len = ((ExtendedCell) this.cell).getSerializedSize(false);
if (withTags && this.tags != null) {
len += KeyValue.TAGS_LENGTH_SIZE + this.tags.length;
}
return len;
}
}
/**
@ -1996,7 +2017,9 @@ public final class CellUtil {
* These cells are used in reseeks/seeks to improve the read performance.
* They are not real cells that are returned back to the clients
*/
private static abstract class EmptyByteBufferedCell extends ByteBufferedCell implements SettableSequenceId {
private static abstract class EmptyByteBufferedCell extends ByteBufferedCell
implements SettableSequenceId {
@Override
public void setSequenceId(long seqId) {
// Fake cells don't need seqId, so leaving it as a noop.

View File

@ -0,0 +1,62 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase;
import java.io.IOException;
import java.io.OutputStream;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.io.HeapSize;
/**
* Extension to {@link Cell} with server side required functions. Server side Cell implementations
* must implement this.
* @see SettableSequenceId
* @see SettableTimestamp
*/
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.COPROC)
public interface ExtendedCell extends Cell, SettableSequenceId, SettableTimestamp, HeapSize,
Cloneable {
/**
* Write this cell to an OutputStream in a {@link KeyValue} format.
* <br> KeyValue format <br>
* <code>&lt;4 bytes keylength&gt; &lt;4 bytes valuelength&gt; &lt;2 bytes rowlength&gt;
* &lt;row&gt; &lt;1 byte columnfamilylength&gt; &lt;columnfamily&gt; &lt;columnqualifier&gt;
* &lt;8 bytes timestamp&gt; &lt;1 byte keytype&gt; &lt;value&gt; &lt;2 bytes tagslength&gt;
* &lt;tags&gt;</code>
* @param out Stream to which cell has to be written
* @param withTags Whether to write tags.
* @return how many bytes are written.
* @throws IOException
*/
// TODO remove the boolean param once HBASE-16706 is done.
int write(OutputStream out, boolean withTags) throws IOException;
/**
* @param withTags Whether to write tags.
* @return Bytes count required to serialize this Cell in a {@link KeyValue} format.
* <br> KeyValue format <br>
* <code>&lt;4 bytes keylength&gt; &lt;4 bytes valuelength&gt; &lt;2 bytes rowlength&gt;
* &lt;row&gt; &lt;1 byte columnfamilylength&gt; &lt;columnfamily&gt; &lt;columnqualifier&gt;
* &lt;8 bytes timestamp&gt; &lt;1 byte keytype&gt; &lt;value&gt; &lt;2 bytes tagslength&gt;
* &lt;tags&gt;</code>
*/
// TODO remove the boolean param once HBASE-16706 is done.
int getSerializedSize(boolean withTags);
}

View File

@ -38,7 +38,6 @@ import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.io.HeapSize;
import org.apache.hadoop.hbase.util.ByteBufferUtils;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ClassSize;
@ -80,8 +79,7 @@ import com.google.common.annotations.VisibleForTesting;
* length and actual tag bytes length.
*/
@InterfaceAudience.Private
public class KeyValue implements Cell, HeapSize, Cloneable, SettableSequenceId,
SettableTimestamp, Streamable {
public class KeyValue implements ExtendedCell {
private static final ArrayList<Tag> EMPTY_ARRAY_LIST = new ArrayList<Tag>();
private static final Log LOG = LogFactory.getLog(KeyValue.class);
@ -2475,25 +2473,23 @@ public class KeyValue implements Cell, HeapSize, Cloneable, SettableSequenceId,
@Deprecated
public static long oswrite(final KeyValue kv, final OutputStream out, final boolean withTags)
throws IOException {
return kv.write(out, withTags);
}
@Override
public int write(OutputStream out) throws IOException {
return write(out, true);
ByteBufferUtils.putInt(out, kv.getSerializedSize(withTags));
return kv.write(out, withTags) + Bytes.SIZEOF_INT;
}
@Override
public int write(OutputStream out, boolean withTags) throws IOException {
// In KeyValueUtil#oswrite we do a Cell serialization as KeyValue. Any changes doing here, pls
// check KeyValueUtil#oswrite also and do necessary changes.
int length = this.length;
if (!withTags) {
length = this.getKeyLength() + this.getValueLength() + KEYVALUE_INFRASTRUCTURE_SIZE;
int len = getSerializedSize(withTags);
out.write(this.bytes, this.offset, len);
return len;
}
@Override
public int getSerializedSize(boolean withTags) {
if (withTags) {
return this.length;
}
ByteBufferUtils.putInt(out, length);
out.write(this.bytes, this.offset, length);
return length + Bytes.SIZEOF_INT;
return this.getKeyLength() + this.getValueLength() + KEYVALUE_INFRASTRUCTURE_SIZE;
}
/**
@ -2789,5 +2785,12 @@ public class KeyValue implements Cell, HeapSize, Cloneable, SettableSequenceId,
public long heapSize() {
return super.heapSize() + Bytes.SIZEOF_SHORT;
}
@Override
public int write(OutputStream out, boolean withTags) throws IOException {
// This type of Cell is used only to maintain some internal states. We never allow this type
// of Cell to be returned back over the RPC
throw new IllegalStateException("A reader should never return this type of a Cell");
}
}
}

View File

@ -594,10 +594,18 @@ public class KeyValueUtil {
return new KeyValue(bytes, 0, length);
}
public static int getSerializedSize(Cell cell, boolean withTags) {
if (cell instanceof ExtendedCell) {
return ((ExtendedCell) cell).getSerializedSize(withTags);
}
return length(cell.getRowLength(), cell.getFamilyLength(), cell.getQualifierLength(),
cell.getValueLength(), cell.getTagsLength(), withTags);
}
public static void oswrite(final Cell cell, final OutputStream out, final boolean withTags)
throws IOException {
if (cell instanceof Streamable) {
((Streamable)cell).write(out, withTags);
if (cell instanceof ExtendedCell) {
((ExtendedCell)cell).write(out, withTags);
} else {
short rlen = cell.getRowLength();
byte flen = cell.getFamilyLength();
@ -605,8 +613,6 @@ public class KeyValueUtil {
int vlen = cell.getValueLength();
int tlen = cell.getTagsLength();
// write total length
ByteBufferUtils.putInt(out, length(rlen, flen, qlen, vlen, tlen, withTags));
// write key length
ByteBufferUtils.putInt(out, keyLength(rlen, flen, qlen));
// write value length

View File

@ -23,8 +23,6 @@ import java.io.IOException;
import java.io.OutputStream;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.util.ByteBufferUtils;
import org.apache.hadoop.hbase.util.Bytes;
/**
* An extension of the KeyValue where the tags length is always 0
@ -42,10 +40,12 @@ public class NoTagsKeyValue extends KeyValue {
@Override
public int write(OutputStream out, boolean withTags) throws IOException {
// In KeyValueUtil#oswrite we do a Cell serialization as KeyValue. Any changes doing here, pls
// check KeyValueUtil#oswrite also and do necessary changes.
ByteBufferUtils.putInt(out, this.length);
out.write(this.bytes, this.offset, this.length);
return this.length + Bytes.SIZEOF_INT;
return this.length;
}
@Override
public int getSerializedSize(boolean withTags) {
return this.length;
}
}

View File

@ -22,7 +22,6 @@ import java.io.OutputStream;
import java.nio.ByteBuffer;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.io.HeapSize;
import org.apache.hadoop.hbase.util.ByteBufferUtils;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ClassSize;
@ -32,8 +31,7 @@ import org.apache.hadoop.hbase.util.ClassSize;
* memory.
*/
@InterfaceAudience.Private
public class OffheapKeyValue extends ByteBufferedCell
implements HeapSize, SettableSequenceId, Streamable {
public class OffheapKeyValue extends ByteBufferedCell implements ExtendedCell {
protected final ByteBuffer buf;
protected final int offset;
@ -241,25 +239,36 @@ public class OffheapKeyValue extends ByteBufferedCell
}
@Override
public int write(OutputStream out) throws IOException {
return write(out, true);
public int write(OutputStream out, boolean withTags) throws IOException {
int length = getSerializedSize(withTags);
ByteBufferUtils.copyBufferToStream(out, this.buf, this.offset, length);
return length;
}
@Override
public int write(OutputStream out, boolean withTags) throws IOException {
// In KeyValueUtil#oswrite we do a Cell serialization as KeyValue. Any
// changes doing here, pls check KeyValueUtil#oswrite also and do necessary changes.
int length = this.length;
if (hasTags && !withTags) {
length = keyLen + this.getValueLength() + KeyValue.KEYVALUE_INFRASTRUCTURE_SIZE;
public int getSerializedSize(boolean withTags) {
if (withTags) {
return this.length;
}
ByteBufferUtils.putInt(out, length);
ByteBufferUtils.copyBufferToStream(out, this.buf, this.offset, length);
return length + Bytes.SIZEOF_INT;
return this.keyLen + this.getValueLength() + KeyValue.KEYVALUE_INFRASTRUCTURE_SIZE;
}
@Override
public String toString() {
return CellUtil.toString(this, true);
}
@Override
public void setTimestamp(long ts) throws IOException {
// This Cell implementation is not yet used in write path.
// TODO when doing HBASE-15179
throw new UnsupportedOperationException();
}
@Override
public void setTimestamp(byte[] ts, int tsOffset) throws IOException {
// This Cell implementation is not yet used in write path.
// TODO when doing HBASE-15179
throw new UnsupportedOperationException();
}
}

View File

@ -24,8 +24,10 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
/**
* Using this Interface one can mark a Cell as Sequence stampable. <br>
* Note : Make sure to make Cell implementation of this type in server side.
* @deprecated as of 2.0 and will be removed in 3.0. Use {@link ExtendedCell} instead
*/
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.COPROC)
@Deprecated
public interface SettableSequenceId {
/**

View File

@ -24,8 +24,10 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
/**
* Using this Interface one can mark a Cell as timestamp changeable. <br>
* Note : Server side Cell implementations in write path must implement this.
* @deprecated as of 2.0 and will be removed in 3.0. Use {@link ExtendedCell} instead
*/
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.COPROC)
@Deprecated
public interface SettableTimestamp {
/**

View File

@ -23,8 +23,6 @@ import java.io.IOException;
import java.io.OutputStream;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.util.ByteBufferUtils;
import org.apache.hadoop.hbase.util.Bytes;
/**
* This class is an extension to ContentSizeCachedKeyValue where there are no tags in Cell.
@ -45,8 +43,12 @@ public class SizeCachedNoTagsKeyValue extends SizeCachedKeyValue {
@Override
public int write(OutputStream out, boolean withTags) throws IOException {
ByteBufferUtils.putInt(out, this.length);
out.write(this.bytes, this.offset, this.length);
return this.length + Bytes.SIZEOF_INT;
return this.length;
}
@Override
public int getSerializedSize(boolean withTags) {
return this.length;
}
}

View File

@ -1,47 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase;
import java.io.IOException;
import java.io.OutputStream;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
/**
* This marks a Cell as streamable to a given OutputStream.
*/
@InterfaceAudience.Private
public interface Streamable {
/**
* Write this cell to an OutputStream.
* @param out Stream to which cell has to be written
* @return how many bytes are written.
* @throws IOException
*/
int write(OutputStream out) throws IOException;
/**
* Write this cell to an OutputStream.
* @param out Stream to which cell has to be written
* @param withTags Whether to write tags.
* @return how many bytes are written.
* @throws IOException
*/
int write(OutputStream out, boolean withTags) throws IOException;
}

View File

@ -60,6 +60,7 @@ public class KeyValueCodec implements Codec {
public void write(Cell cell) throws IOException {
checkFlushed();
// Do not write tags over RPC
ByteBufferUtils.putInt(this.out, KeyValueUtil.getSerializedSize(cell, false));
KeyValueUtil.oswrite(cell, out, false);
}
}

View File

@ -26,6 +26,7 @@ import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.util.ByteBufferUtils;
/**
* Codec that does KeyValue version 1 serialization with serializing tags also.
@ -61,6 +62,7 @@ public class KeyValueCodecWithTags implements Codec {
public void write(Cell cell) throws IOException {
checkFlushed();
// Write tags
ByteBufferUtils.putInt(this.out, KeyValueUtil.getSerializedSize(cell, true));
KeyValueUtil.oswrite(cell, out, true);
}
}

View File

@ -26,14 +26,12 @@ import org.apache.hadoop.hbase.ByteBufferedCell;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.ExtendedCell;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValue.Type;
import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.SettableSequenceId;
import org.apache.hadoop.hbase.Streamable;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.io.HeapSize;
import org.apache.hadoop.hbase.io.TagCompressionContext;
import org.apache.hadoop.hbase.io.util.LRUDictionary;
import org.apache.hadoop.hbase.io.util.StreamUtils;
@ -280,8 +278,7 @@ abstract class BufferedDataBlockEncoder extends AbstractDataBlockEncoder {
*/
// We return this as a Cell to the upper layers of read flow and might try setting a new SeqId
// there. So this has to be an instance of SettableSequenceId.
protected static class OnheapDecodedCell implements Cell, HeapSize, SettableSequenceId,
Streamable {
protected static class OnheapDecodedCell implements ExtendedCell {
private static final long FIXED_OVERHEAD = ClassSize.align(ClassSize.OBJECT
+ (3 * ClassSize.REFERENCE) + (2 * Bytes.SIZEOF_LONG) + (7 * Bytes.SIZEOF_INT)
+ (Bytes.SIZEOF_SHORT) + (2 * Bytes.SIZEOF_BYTE) + (3 * ClassSize.ARRAY));
@ -428,16 +425,9 @@ abstract class BufferedDataBlockEncoder extends AbstractDataBlockEncoder {
return FIXED_OVERHEAD + rowLength + familyLength + qualifierLength + valueLength + tagsLength;
}
@Override
public int write(OutputStream out) throws IOException {
return write(out, true);
}
@Override
public int write(OutputStream out, boolean withTags) throws IOException {
int lenToWrite = KeyValueUtil.length(rowLength, familyLength, qualifierLength, valueLength,
tagsLength, withTags);
ByteBufferUtils.putInt(out, lenToWrite);
int lenToWrite = getSerializedSize(withTags);
ByteBufferUtils.putInt(out, keyOnlyBuffer.length);
ByteBufferUtils.putInt(out, valueLength);
// Write key
@ -452,12 +442,29 @@ abstract class BufferedDataBlockEncoder extends AbstractDataBlockEncoder {
out.write((byte) (0xff & this.tagsLength));
out.write(this.tagsBuffer, this.tagsOffset, this.tagsLength);
}
return lenToWrite + Bytes.SIZEOF_INT;
return lenToWrite;
}
@Override
public int getSerializedSize(boolean withTags) {
return KeyValueUtil.length(rowLength, familyLength, qualifierLength, valueLength, tagsLength,
withTags);
}
@Override
public void setTimestamp(long ts) throws IOException {
// This is not used in actual flow. Throwing UnsupportedOperationException
throw new UnsupportedOperationException();
}
@Override
public void setTimestamp(byte[] ts, int tsOffset) throws IOException {
// This is not used in actual flow. Throwing UnsupportedOperationException
throw new UnsupportedOperationException();
}
}
protected static class OffheapDecodedCell extends ByteBufferedCell implements HeapSize,
SettableSequenceId, Streamable {
protected static class OffheapDecodedCell extends ByteBufferedCell implements ExtendedCell {
private static final long FIXED_OVERHEAD = ClassSize.align(ClassSize.OBJECT
+ (3 * ClassSize.REFERENCE) + (2 * Bytes.SIZEOF_LONG) + (7 * Bytes.SIZEOF_INT)
+ (Bytes.SIZEOF_SHORT) + (2 * Bytes.SIZEOF_BYTE) + (3 * ClassSize.BYTE_BUFFER));
@ -651,16 +658,9 @@ abstract class BufferedDataBlockEncoder extends AbstractDataBlockEncoder {
this.seqId = seqId;
}
@Override
public int write(OutputStream out) throws IOException {
return write(out, true);
}
@Override
public int write(OutputStream out, boolean withTags) throws IOException {
int lenToWrite = KeyValueUtil.length(rowLength, familyLength, qualifierLength, valueLength,
tagsLength, withTags);
ByteBufferUtils.putInt(out, lenToWrite);
int lenToWrite = getSerializedSize(withTags);
ByteBufferUtils.putInt(out, keyBuffer.capacity());
ByteBufferUtils.putInt(out, valueLength);
// Write key
@ -675,7 +675,25 @@ abstract class BufferedDataBlockEncoder extends AbstractDataBlockEncoder {
out.write((byte) (0xff & this.tagsLength));
ByteBufferUtils.copyBufferToStream(out, this.tagsBuffer, this.tagsOffset, this.tagsLength);
}
return lenToWrite + Bytes.SIZEOF_INT;
return lenToWrite;
}
@Override
public int getSerializedSize(boolean withTags) {
return KeyValueUtil.length(rowLength, familyLength, qualifierLength, valueLength, tagsLength,
withTags);
}
@Override
public void setTimestamp(long ts) throws IOException {
// This is not used in actual flow. Throwing UnsupportedOperationException
throw new UnsupportedOperationException();
}
@Override
public void setTimestamp(byte[] ts, int tsOffset) throws IOException {
// This is not used in actual flow. Throwing UnsupportedOperationException
throw new UnsupportedOperationException();
}
}

View File

@ -34,6 +34,7 @@ import junit.framework.TestCase;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.KeyValue.Type;
import org.apache.hadoop.hbase.util.ByteBufferUtils;
import org.apache.hadoop.hbase.util.Bytes;
import static org.junit.Assert.assertNotEquals;
@ -569,7 +570,9 @@ public class TestKeyValue extends TestCase {
MockKeyValue mkvA2 = new MockKeyValue(kvA2);
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
DataOutputStream os = new DataOutputStream(byteArrayOutputStream);
ByteBufferUtils.putInt(os, KeyValueUtil.getSerializedSize(mkvA1, true));
KeyValueUtil.oswrite(mkvA1, os, true);
ByteBufferUtils.putInt(os, KeyValueUtil.getSerializedSize(mkvA2, true));
KeyValueUtil.oswrite(mkvA2, os, true);
DataInputStream is = new DataInputStream(new ByteArrayInputStream(
byteArrayOutputStream.toByteArray()));

View File

@ -36,6 +36,7 @@ import org.apache.hadoop.hbase.codec.KeyValueCodecWithTags;
import org.apache.hadoop.hbase.io.ByteBufferInputStream;
import org.apache.hadoop.hbase.io.util.Dictionary;
import org.apache.hadoop.hbase.io.util.StreamUtils;
import org.apache.hadoop.hbase.util.ByteBufferUtils;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ReflectionUtils;
import org.apache.hadoop.io.IOUtils;
@ -339,6 +340,7 @@ public class WALCellCodec implements Codec {
public void write(Cell cell) throws IOException {
checkFlushed();
// Make sure to write tags into WAL
ByteBufferUtils.putInt(this.out, KeyValueUtil.getSerializedSize(cell, true));
KeyValueUtil.oswrite(cell, this.out, true);
}
}