From b644e0fb8d725b9a09665d7b92544eb32155c689 Mon Sep 17 00:00:00 2001 From: anoopsamjohn Date: Tue, 27 Sep 2016 22:55:45 +0530 Subject: [PATCH] HBASE-16134 Introduce Cell extension for server side. --- .../org/apache/hadoop/hbase/CellUtil.java | 33 +++++++-- .../org/apache/hadoop/hbase/ExtendedCell.java | 62 +++++++++++++++++ .../org/apache/hadoop/hbase/KeyValue.java | 37 +++++----- .../org/apache/hadoop/hbase/KeyValueUtil.java | 14 ++-- .../apache/hadoop/hbase/NoTagsKeyValue.java | 12 ++-- .../apache/hadoop/hbase/OffheapKeyValue.java | 37 ++++++---- .../hadoop/hbase/SettableSequenceId.java | 2 + .../hadoop/hbase/SettableTimestamp.java | 2 + .../hbase/SizeCachedNoTagsKeyValue.java | 10 +-- .../org/apache/hadoop/hbase/Streamable.java | 47 ------------- .../hadoop/hbase/codec/KeyValueCodec.java | 1 + .../hbase/codec/KeyValueCodecWithTags.java | 2 + .../io/encoding/BufferedDataBlockEncoder.java | 68 ++++++++++++------- .../org/apache/hadoop/hbase/TestKeyValue.java | 3 + .../hbase/regionserver/wal/WALCellCodec.java | 2 + 15 files changed, 210 insertions(+), 122 deletions(-) create mode 100644 hbase-common/src/main/java/org/apache/hadoop/hbase/ExtendedCell.java delete mode 100644 hbase-common/src/main/java/org/apache/hadoop/hbase/Streamable.java diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java index 38c583837ae..097b11b58be 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java @@ -23,6 +23,7 @@ import static org.apache.hadoop.hbase.Tag.TAG_LENGTH_SIZE; import java.io.DataOutputStream; import java.io.IOException; +import java.io.OutputStream; import java.math.BigDecimal; import java.nio.ByteBuffer; import java.util.ArrayList; @@ -377,8 +378,7 @@ public final class CellUtil { * parts, refer to the original Cell. */ @InterfaceAudience.Private - private static class TagRewriteCell implements Cell, SettableSequenceId, SettableTimestamp, - HeapSize { + private static class TagRewriteCell implements ExtendedCell { protected Cell cell; protected byte[] tags; @@ -387,8 +387,7 @@ public final class CellUtil { * @param tags the tags bytes. The array suppose to contain the tags bytes alone. */ public TagRewriteCell(Cell cell, byte[] tags) { - assert cell instanceof SettableSequenceId; - assert cell instanceof SettableTimestamp; + assert cell instanceof ExtendedCell; assert tags != null; this.cell = cell; this.tags = tags; @@ -522,6 +521,28 @@ public final class CellUtil { // The incoming cell is supposed to be SettableSequenceId type. CellUtil.setSequenceId(cell, seqId); } + + @Override + public int write(OutputStream out, boolean withTags) throws IOException { + int len = ((ExtendedCell) this.cell).write(out, false); + if (withTags && this.tags != null) { + // Write the tagsLength 2 bytes + out.write((byte) (0xff & (this.tags.length >> 8))); + out.write((byte) (0xff & this.tags.length)); + out.write(this.tags); + len += KeyValue.TAGS_LENGTH_SIZE + this.tags.length; + } + return len; + } + + @Override + public int getSerializedSize(boolean withTags) { + int len = ((ExtendedCell) this.cell).getSerializedSize(false); + if (withTags && this.tags != null) { + len += KeyValue.TAGS_LENGTH_SIZE + this.tags.length; + } + return len; + } } /** @@ -1996,7 +2017,9 @@ public final class CellUtil { * These cells are used in reseeks/seeks to improve the read performance. * They are not real cells that are returned back to the clients */ - private static abstract class EmptyByteBufferedCell extends ByteBufferedCell implements SettableSequenceId { + private static abstract class EmptyByteBufferedCell extends ByteBufferedCell + implements SettableSequenceId { + @Override public void setSequenceId(long seqId) { // Fake cells don't need seqId, so leaving it as a noop. diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/ExtendedCell.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/ExtendedCell.java new file mode 100644 index 00000000000..51639daf913 --- /dev/null +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/ExtendedCell.java @@ -0,0 +1,62 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase; + +import java.io.IOException; +import java.io.OutputStream; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.io.HeapSize; + +/** + * Extension to {@link Cell} with server side required functions. Server side Cell implementations + * must implement this. + * @see SettableSequenceId + * @see SettableTimestamp + */ +@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.COPROC) +public interface ExtendedCell extends Cell, SettableSequenceId, SettableTimestamp, HeapSize, + Cloneable { + + /** + * Write this cell to an OutputStream in a {@link KeyValue} format. + *
KeyValue format
+ * <4 bytes keylength> <4 bytes valuelength> <2 bytes rowlength> + * <row> <1 byte columnfamilylength> <columnfamily> <columnqualifier> + * <8 bytes timestamp> <1 byte keytype> <value> <2 bytes tagslength> + * <tags> + * @param out Stream to which cell has to be written + * @param withTags Whether to write tags. + * @return how many bytes are written. + * @throws IOException + */ + // TODO remove the boolean param once HBASE-16706 is done. + int write(OutputStream out, boolean withTags) throws IOException; + + /** + * @param withTags Whether to write tags. + * @return Bytes count required to serialize this Cell in a {@link KeyValue} format. + *
KeyValue format
+ * <4 bytes keylength> <4 bytes valuelength> <2 bytes rowlength> + * <row> <1 byte columnfamilylength> <columnfamily> <columnqualifier> + * <8 bytes timestamp> <1 byte keytype> <value> <2 bytes tagslength> + * <tags> + */ + // TODO remove the boolean param once HBASE-16706 is done. + int getSerializedSize(boolean withTags); +} \ No newline at end of file diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java index 0c33a961ed9..47fad8620ba 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java @@ -38,7 +38,6 @@ import java.util.Map; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.io.HeapSize; import org.apache.hadoop.hbase.util.ByteBufferUtils; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.ClassSize; @@ -80,8 +79,7 @@ import com.google.common.annotations.VisibleForTesting; * length and actual tag bytes length. */ @InterfaceAudience.Private -public class KeyValue implements Cell, HeapSize, Cloneable, SettableSequenceId, - SettableTimestamp, Streamable { +public class KeyValue implements ExtendedCell { private static final ArrayList EMPTY_ARRAY_LIST = new ArrayList(); private static final Log LOG = LogFactory.getLog(KeyValue.class); @@ -2475,25 +2473,23 @@ public class KeyValue implements Cell, HeapSize, Cloneable, SettableSequenceId, @Deprecated public static long oswrite(final KeyValue kv, final OutputStream out, final boolean withTags) throws IOException { - return kv.write(out, withTags); - } - - @Override - public int write(OutputStream out) throws IOException { - return write(out, true); + ByteBufferUtils.putInt(out, kv.getSerializedSize(withTags)); + return kv.write(out, withTags) + Bytes.SIZEOF_INT; } @Override public int write(OutputStream out, boolean withTags) throws IOException { - // In KeyValueUtil#oswrite we do a Cell serialization as KeyValue. Any changes doing here, pls - // check KeyValueUtil#oswrite also and do necessary changes. - int length = this.length; - if (!withTags) { - length = this.getKeyLength() + this.getValueLength() + KEYVALUE_INFRASTRUCTURE_SIZE; + int len = getSerializedSize(withTags); + out.write(this.bytes, this.offset, len); + return len; + } + + @Override + public int getSerializedSize(boolean withTags) { + if (withTags) { + return this.length; } - ByteBufferUtils.putInt(out, length); - out.write(this.bytes, this.offset, length); - return length + Bytes.SIZEOF_INT; + return this.getKeyLength() + this.getValueLength() + KEYVALUE_INFRASTRUCTURE_SIZE; } /** @@ -2789,5 +2785,12 @@ public class KeyValue implements Cell, HeapSize, Cloneable, SettableSequenceId, public long heapSize() { return super.heapSize() + Bytes.SIZEOF_SHORT; } + + @Override + public int write(OutputStream out, boolean withTags) throws IOException { + // This type of Cell is used only to maintain some internal states. We never allow this type + // of Cell to be returned back over the RPC + throw new IllegalStateException("A reader should never return this type of a Cell"); + } } } diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValueUtil.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValueUtil.java index 6b740a7ffb0..b723f58d51e 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValueUtil.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValueUtil.java @@ -594,10 +594,18 @@ public class KeyValueUtil { return new KeyValue(bytes, 0, length); } + public static int getSerializedSize(Cell cell, boolean withTags) { + if (cell instanceof ExtendedCell) { + return ((ExtendedCell) cell).getSerializedSize(withTags); + } + return length(cell.getRowLength(), cell.getFamilyLength(), cell.getQualifierLength(), + cell.getValueLength(), cell.getTagsLength(), withTags); + } + public static void oswrite(final Cell cell, final OutputStream out, final boolean withTags) throws IOException { - if (cell instanceof Streamable) { - ((Streamable)cell).write(out, withTags); + if (cell instanceof ExtendedCell) { + ((ExtendedCell)cell).write(out, withTags); } else { short rlen = cell.getRowLength(); byte flen = cell.getFamilyLength(); @@ -605,8 +613,6 @@ public class KeyValueUtil { int vlen = cell.getValueLength(); int tlen = cell.getTagsLength(); - // write total length - ByteBufferUtils.putInt(out, length(rlen, flen, qlen, vlen, tlen, withTags)); // write key length ByteBufferUtils.putInt(out, keyLength(rlen, flen, qlen)); // write value length diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/NoTagsKeyValue.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/NoTagsKeyValue.java index 1b3f1f49abf..715bc1a9606 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/NoTagsKeyValue.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/NoTagsKeyValue.java @@ -23,8 +23,6 @@ import java.io.IOException; import java.io.OutputStream; import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.util.ByteBufferUtils; -import org.apache.hadoop.hbase.util.Bytes; /** * An extension of the KeyValue where the tags length is always 0 @@ -42,10 +40,12 @@ public class NoTagsKeyValue extends KeyValue { @Override public int write(OutputStream out, boolean withTags) throws IOException { - // In KeyValueUtil#oswrite we do a Cell serialization as KeyValue. Any changes doing here, pls - // check KeyValueUtil#oswrite also and do necessary changes. - ByteBufferUtils.putInt(out, this.length); out.write(this.bytes, this.offset, this.length); - return this.length + Bytes.SIZEOF_INT; + return this.length; + } + + @Override + public int getSerializedSize(boolean withTags) { + return this.length; } } \ No newline at end of file diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/OffheapKeyValue.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/OffheapKeyValue.java index d060b027196..6f80aaefae1 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/OffheapKeyValue.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/OffheapKeyValue.java @@ -22,7 +22,6 @@ import java.io.OutputStream; import java.nio.ByteBuffer; import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.io.HeapSize; import org.apache.hadoop.hbase.util.ByteBufferUtils; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.ClassSize; @@ -32,8 +31,7 @@ import org.apache.hadoop.hbase.util.ClassSize; * memory. */ @InterfaceAudience.Private -public class OffheapKeyValue extends ByteBufferedCell - implements HeapSize, SettableSequenceId, Streamable { +public class OffheapKeyValue extends ByteBufferedCell implements ExtendedCell { protected final ByteBuffer buf; protected final int offset; @@ -241,25 +239,36 @@ public class OffheapKeyValue extends ByteBufferedCell } @Override - public int write(OutputStream out) throws IOException { - return write(out, true); + public int write(OutputStream out, boolean withTags) throws IOException { + int length = getSerializedSize(withTags); + ByteBufferUtils.copyBufferToStream(out, this.buf, this.offset, length); + return length; } @Override - public int write(OutputStream out, boolean withTags) throws IOException { - // In KeyValueUtil#oswrite we do a Cell serialization as KeyValue. Any - // changes doing here, pls check KeyValueUtil#oswrite also and do necessary changes. - int length = this.length; - if (hasTags && !withTags) { - length = keyLen + this.getValueLength() + KeyValue.KEYVALUE_INFRASTRUCTURE_SIZE; + public int getSerializedSize(boolean withTags) { + if (withTags) { + return this.length; } - ByteBufferUtils.putInt(out, length); - ByteBufferUtils.copyBufferToStream(out, this.buf, this.offset, length); - return length + Bytes.SIZEOF_INT; + return this.keyLen + this.getValueLength() + KeyValue.KEYVALUE_INFRASTRUCTURE_SIZE; } @Override public String toString() { return CellUtil.toString(this, true); } + + @Override + public void setTimestamp(long ts) throws IOException { + // This Cell implementation is not yet used in write path. + // TODO when doing HBASE-15179 + throw new UnsupportedOperationException(); + } + + @Override + public void setTimestamp(byte[] ts, int tsOffset) throws IOException { + // This Cell implementation is not yet used in write path. + // TODO when doing HBASE-15179 + throw new UnsupportedOperationException(); + } } diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/SettableSequenceId.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/SettableSequenceId.java index 352028a62d8..fcf4ac48448 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/SettableSequenceId.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/SettableSequenceId.java @@ -24,8 +24,10 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience; /** * Using this Interface one can mark a Cell as Sequence stampable.
* Note : Make sure to make Cell implementation of this type in server side. + * @deprecated as of 2.0 and will be removed in 3.0. Use {@link ExtendedCell} instead */ @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.COPROC) +@Deprecated public interface SettableSequenceId { /** diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/SettableTimestamp.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/SettableTimestamp.java index 6dac5ae0bca..8637db2ca0d 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/SettableTimestamp.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/SettableTimestamp.java @@ -24,8 +24,10 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience; /** * Using this Interface one can mark a Cell as timestamp changeable.
* Note : Server side Cell implementations in write path must implement this. + * @deprecated as of 2.0 and will be removed in 3.0. Use {@link ExtendedCell} instead */ @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.COPROC) +@Deprecated public interface SettableTimestamp { /** diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/SizeCachedNoTagsKeyValue.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/SizeCachedNoTagsKeyValue.java index d28d1a8af85..322c6680985 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/SizeCachedNoTagsKeyValue.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/SizeCachedNoTagsKeyValue.java @@ -23,8 +23,6 @@ import java.io.IOException; import java.io.OutputStream; import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.util.ByteBufferUtils; -import org.apache.hadoop.hbase.util.Bytes; /** * This class is an extension to ContentSizeCachedKeyValue where there are no tags in Cell. @@ -45,8 +43,12 @@ public class SizeCachedNoTagsKeyValue extends SizeCachedKeyValue { @Override public int write(OutputStream out, boolean withTags) throws IOException { - ByteBufferUtils.putInt(out, this.length); out.write(this.bytes, this.offset, this.length); - return this.length + Bytes.SIZEOF_INT; + return this.length; + } + + @Override + public int getSerializedSize(boolean withTags) { + return this.length; } } diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/Streamable.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/Streamable.java deleted file mode 100644 index be91a569c4f..00000000000 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/Streamable.java +++ /dev/null @@ -1,47 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase; - -import java.io.IOException; -import java.io.OutputStream; - -import org.apache.hadoop.hbase.classification.InterfaceAudience; - -/** - * This marks a Cell as streamable to a given OutputStream. - */ -@InterfaceAudience.Private -public interface Streamable { - - /** - * Write this cell to an OutputStream. - * @param out Stream to which cell has to be written - * @return how many bytes are written. - * @throws IOException - */ - int write(OutputStream out) throws IOException; - - /** - * Write this cell to an OutputStream. - * @param out Stream to which cell has to be written - * @param withTags Whether to write tags. - * @return how many bytes are written. - * @throws IOException - */ - int write(OutputStream out, boolean withTags) throws IOException; -} \ No newline at end of file diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/KeyValueCodec.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/KeyValueCodec.java index 5165f58cdcc..260939845a3 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/KeyValueCodec.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/KeyValueCodec.java @@ -60,6 +60,7 @@ public class KeyValueCodec implements Codec { public void write(Cell cell) throws IOException { checkFlushed(); // Do not write tags over RPC + ByteBufferUtils.putInt(this.out, KeyValueUtil.getSerializedSize(cell, false)); KeyValueUtil.oswrite(cell, out, false); } } diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/KeyValueCodecWithTags.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/KeyValueCodecWithTags.java index 8d2ee99337c..63c02e8ffd2 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/KeyValueCodecWithTags.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/KeyValueCodecWithTags.java @@ -26,6 +26,7 @@ import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.KeyValueUtil; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.util.ByteBufferUtils; /** * Codec that does KeyValue version 1 serialization with serializing tags also. @@ -61,6 +62,7 @@ public class KeyValueCodecWithTags implements Codec { public void write(Cell cell) throws IOException { checkFlushed(); // Write tags + ByteBufferUtils.putInt(this.out, KeyValueUtil.getSerializedSize(cell, true)); KeyValueUtil.oswrite(cell, out, true); } } diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/BufferedDataBlockEncoder.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/BufferedDataBlockEncoder.java index 05ae4a2329a..9f214ccb9a4 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/BufferedDataBlockEncoder.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/BufferedDataBlockEncoder.java @@ -26,14 +26,12 @@ import org.apache.hadoop.hbase.ByteBufferedCell; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellComparator; import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.ExtendedCell; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValue.Type; import org.apache.hadoop.hbase.KeyValueUtil; -import org.apache.hadoop.hbase.SettableSequenceId; -import org.apache.hadoop.hbase.Streamable; import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.io.HeapSize; import org.apache.hadoop.hbase.io.TagCompressionContext; import org.apache.hadoop.hbase.io.util.LRUDictionary; import org.apache.hadoop.hbase.io.util.StreamUtils; @@ -280,8 +278,7 @@ abstract class BufferedDataBlockEncoder extends AbstractDataBlockEncoder { */ // We return this as a Cell to the upper layers of read flow and might try setting a new SeqId // there. So this has to be an instance of SettableSequenceId. - protected static class OnheapDecodedCell implements Cell, HeapSize, SettableSequenceId, - Streamable { + protected static class OnheapDecodedCell implements ExtendedCell { private static final long FIXED_OVERHEAD = ClassSize.align(ClassSize.OBJECT + (3 * ClassSize.REFERENCE) + (2 * Bytes.SIZEOF_LONG) + (7 * Bytes.SIZEOF_INT) + (Bytes.SIZEOF_SHORT) + (2 * Bytes.SIZEOF_BYTE) + (3 * ClassSize.ARRAY)); @@ -428,16 +425,9 @@ abstract class BufferedDataBlockEncoder extends AbstractDataBlockEncoder { return FIXED_OVERHEAD + rowLength + familyLength + qualifierLength + valueLength + tagsLength; } - @Override - public int write(OutputStream out) throws IOException { - return write(out, true); - } - @Override public int write(OutputStream out, boolean withTags) throws IOException { - int lenToWrite = KeyValueUtil.length(rowLength, familyLength, qualifierLength, valueLength, - tagsLength, withTags); - ByteBufferUtils.putInt(out, lenToWrite); + int lenToWrite = getSerializedSize(withTags); ByteBufferUtils.putInt(out, keyOnlyBuffer.length); ByteBufferUtils.putInt(out, valueLength); // Write key @@ -452,12 +442,29 @@ abstract class BufferedDataBlockEncoder extends AbstractDataBlockEncoder { out.write((byte) (0xff & this.tagsLength)); out.write(this.tagsBuffer, this.tagsOffset, this.tagsLength); } - return lenToWrite + Bytes.SIZEOF_INT; + return lenToWrite; + } + + @Override + public int getSerializedSize(boolean withTags) { + return KeyValueUtil.length(rowLength, familyLength, qualifierLength, valueLength, tagsLength, + withTags); + } + + @Override + public void setTimestamp(long ts) throws IOException { + // This is not used in actual flow. Throwing UnsupportedOperationException + throw new UnsupportedOperationException(); + } + + @Override + public void setTimestamp(byte[] ts, int tsOffset) throws IOException { + // This is not used in actual flow. Throwing UnsupportedOperationException + throw new UnsupportedOperationException(); } } - protected static class OffheapDecodedCell extends ByteBufferedCell implements HeapSize, - SettableSequenceId, Streamable { + protected static class OffheapDecodedCell extends ByteBufferedCell implements ExtendedCell { private static final long FIXED_OVERHEAD = ClassSize.align(ClassSize.OBJECT + (3 * ClassSize.REFERENCE) + (2 * Bytes.SIZEOF_LONG) + (7 * Bytes.SIZEOF_INT) + (Bytes.SIZEOF_SHORT) + (2 * Bytes.SIZEOF_BYTE) + (3 * ClassSize.BYTE_BUFFER)); @@ -651,16 +658,9 @@ abstract class BufferedDataBlockEncoder extends AbstractDataBlockEncoder { this.seqId = seqId; } - @Override - public int write(OutputStream out) throws IOException { - return write(out, true); - } - @Override public int write(OutputStream out, boolean withTags) throws IOException { - int lenToWrite = KeyValueUtil.length(rowLength, familyLength, qualifierLength, valueLength, - tagsLength, withTags); - ByteBufferUtils.putInt(out, lenToWrite); + int lenToWrite = getSerializedSize(withTags); ByteBufferUtils.putInt(out, keyBuffer.capacity()); ByteBufferUtils.putInt(out, valueLength); // Write key @@ -675,7 +675,25 @@ abstract class BufferedDataBlockEncoder extends AbstractDataBlockEncoder { out.write((byte) (0xff & this.tagsLength)); ByteBufferUtils.copyBufferToStream(out, this.tagsBuffer, this.tagsOffset, this.tagsLength); } - return lenToWrite + Bytes.SIZEOF_INT; + return lenToWrite; + } + + @Override + public int getSerializedSize(boolean withTags) { + return KeyValueUtil.length(rowLength, familyLength, qualifierLength, valueLength, tagsLength, + withTags); + } + + @Override + public void setTimestamp(long ts) throws IOException { + // This is not used in actual flow. Throwing UnsupportedOperationException + throw new UnsupportedOperationException(); + } + + @Override + public void setTimestamp(byte[] ts, int tsOffset) throws IOException { + // This is not used in actual flow. Throwing UnsupportedOperationException + throw new UnsupportedOperationException(); } } diff --git a/hbase-common/src/test/java/org/apache/hadoop/hbase/TestKeyValue.java b/hbase-common/src/test/java/org/apache/hadoop/hbase/TestKeyValue.java index e2333486361..4e0090d24f3 100644 --- a/hbase-common/src/test/java/org/apache/hadoop/hbase/TestKeyValue.java +++ b/hbase-common/src/test/java/org/apache/hadoop/hbase/TestKeyValue.java @@ -34,6 +34,7 @@ import junit.framework.TestCase; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.KeyValue.Type; +import org.apache.hadoop.hbase.util.ByteBufferUtils; import org.apache.hadoop.hbase.util.Bytes; import static org.junit.Assert.assertNotEquals; @@ -569,7 +570,9 @@ public class TestKeyValue extends TestCase { MockKeyValue mkvA2 = new MockKeyValue(kvA2); ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); DataOutputStream os = new DataOutputStream(byteArrayOutputStream); + ByteBufferUtils.putInt(os, KeyValueUtil.getSerializedSize(mkvA1, true)); KeyValueUtil.oswrite(mkvA1, os, true); + ByteBufferUtils.putInt(os, KeyValueUtil.getSerializedSize(mkvA2, true)); KeyValueUtil.oswrite(mkvA2, os, true); DataInputStream is = new DataInputStream(new ByteArrayInputStream( byteArrayOutputStream.toByteArray())); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCellCodec.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCellCodec.java index 87850aa1e64..7f51b7acde7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCellCodec.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCellCodec.java @@ -36,6 +36,7 @@ import org.apache.hadoop.hbase.codec.KeyValueCodecWithTags; import org.apache.hadoop.hbase.io.ByteBufferInputStream; import org.apache.hadoop.hbase.io.util.Dictionary; import org.apache.hadoop.hbase.io.util.StreamUtils; +import org.apache.hadoop.hbase.util.ByteBufferUtils; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.ReflectionUtils; import org.apache.hadoop.io.IOUtils; @@ -339,6 +340,7 @@ public class WALCellCodec implements Codec { public void write(Cell cell) throws IOException { checkFlushed(); // Make sure to write tags into WAL + ByteBufferUtils.putInt(this.out, KeyValueUtil.getSerializedSize(cell, true)); KeyValueUtil.oswrite(cell, this.out, true); } }