diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java index 883595740c5..1b7dd6b454e 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java @@ -79,7 +79,8 @@ import com.google.common.annotations.VisibleForTesting; * and actual tag bytes length. */ @InterfaceAudience.Private -public class KeyValue implements Cell, HeapSize, Cloneable, SettableSequenceId, SettableTimestamp { +public class KeyValue implements Cell, HeapSize, Cloneable, SettableSequenceId, + SettableTimestamp, Streamable { private static final ArrayList EMPTY_ARRAY_LIST = new ArrayList(); private static final Log LOG = LogFactory.getLog(KeyValue.class); @@ -2495,28 +2496,6 @@ public class KeyValue implements Cell, HeapSize, Cloneable, SettableSequenceId, return length + Bytes.SIZEOF_INT; } - /** - * Write out a KeyValue in the manner in which we used to when KeyValue was a Writable but do - * not require a {@link DataOutput}, just take plain {@link OutputStream} - * Named oswrite so does not clash with {@link #write(KeyValue, DataOutput)} - * @param kv - * @param out - * @return Length written on stream - * @throws IOException - * @see #create(DataInput) for the inverse function - * @see #write(KeyValue, DataOutput) - * @deprecated use {@link #oswrite(KeyValue, OutputStream, boolean)} instead - */ - @Deprecated - public static long oswrite(final KeyValue kv, final OutputStream out) - throws IOException { - int length = kv.getLength(); - // This does same as DataOuput#writeInt (big-endian, etc.) - out.write(Bytes.toBytes(length)); - out.write(kv.getBuffer(), kv.getOffset(), length); - return length + Bytes.SIZEOF_INT; - } - /** * Write out a KeyValue in the manner in which we used to when KeyValue was a Writable but do * not require a {@link DataOutput}, just take plain {@link OutputStream} @@ -2529,18 +2508,31 @@ public class KeyValue implements Cell, HeapSize, Cloneable, SettableSequenceId, * @see #create(DataInput) for the inverse function * @see #write(KeyValue, DataOutput) * @see KeyValueUtil#oswrite(Cell, OutputStream, boolean) + * @deprecated As of release 2.0.0, this will be removed in HBase 3.0.0. + * Instead use {@link #write(OutputStream, boolean)} */ + @Deprecated public static long oswrite(final KeyValue kv, final OutputStream out, final boolean withTags) throws IOException { + return kv.write(out, withTags); + } + + @Override + public int write(OutputStream out) throws IOException { + return write(out, true); + } + + @Override + public int write(OutputStream out, boolean withTags) throws IOException { // In KeyValueUtil#oswrite we do a Cell serialization as KeyValue. Any changes doing here, pls // check KeyValueUtil#oswrite also and do necessary changes. - int length = kv.getLength(); + int length = this.length; if (!withTags) { - length = kv.getKeyLength() + kv.getValueLength() + KEYVALUE_INFRASTRUCTURE_SIZE; + length = this.getKeyLength() + this.getValueLength() + KEYVALUE_INFRASTRUCTURE_SIZE; } // This does same as DataOuput#writeInt (big-endian, etc.) StreamUtils.writeInt(out, length); - out.write(kv.getBuffer(), kv.getOffset(), length); + out.write(this.bytes, this.offset, length); return length + Bytes.SIZEOF_INT; } diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValueUtil.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValueUtil.java index c94807b8f6f..3b0c05c0d33 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValueUtil.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValueUtil.java @@ -58,7 +58,7 @@ public class KeyValueUtil { cell.getValueLength(), cell.getTagsLength(), true); } - private static int length(short rlen, byte flen, int qlen, int vlen, int tlen, boolean withTags) { + public static int length(short rlen, byte flen, int qlen, int vlen, int tlen, boolean withTags) { if (withTags) { return (int) (KeyValue.getKeyValueDataStructureSize(rlen, flen, qlen, vlen, tlen)); } @@ -669,8 +669,8 @@ public class KeyValueUtil { public static void oswrite(final Cell cell, final OutputStream out, final boolean withTags) throws IOException { - if (cell instanceof KeyValue) { - KeyValue.oswrite((KeyValue) cell, out, withTags); + if (cell instanceof Streamable) { + ((Streamable)cell).write(out, withTags); } else { short rlen = cell.getRowLength(); byte flen = cell.getFamilyLength(); diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/NoTagsKeyValue.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/NoTagsKeyValue.java index c4c8351f5b6..6de66538511 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/NoTagsKeyValue.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/NoTagsKeyValue.java @@ -19,7 +19,12 @@ */ package org.apache.hadoop.hbase; +import java.io.IOException; +import java.io.OutputStream; + import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.io.util.StreamUtils; +import org.apache.hadoop.hbase.util.Bytes; /** * An extension of the KeyValue where the tags length is always 0 @@ -34,4 +39,14 @@ public class NoTagsKeyValue extends KeyValue { public int getTagsLength() { return 0; } + + @Override + public int write(OutputStream out, boolean withTags) throws IOException { + // In KeyValueUtil#oswrite we do a Cell serialization as KeyValue. Any changes doing here, pls + // check KeyValueUtil#oswrite also and do necessary changes. + // This does same as DataOuput#writeInt (big-endian, etc.) + StreamUtils.writeInt(out, this.length); + out.write(this.bytes, this.offset, this.length); + return this.length + Bytes.SIZEOF_INT; + } } \ No newline at end of file diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/Streamable.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/Streamable.java new file mode 100644 index 00000000000..be91a569c4f --- /dev/null +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/Streamable.java @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase; + +import java.io.IOException; +import java.io.OutputStream; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; + +/** + * This marks a Cell as streamable to a given OutputStream. + */ +@InterfaceAudience.Private +public interface Streamable { + + /** + * Write this cell to an OutputStream. + * @param out Stream to which cell has to be written + * @return how many bytes are written. + * @throws IOException + */ + int write(OutputStream out) throws IOException; + + /** + * Write this cell to an OutputStream. + * @param out Stream to which cell has to be written + * @param withTags Whether to write tags. + * @return how many bytes are written. + * @throws IOException + */ + int write(OutputStream out, boolean withTags) throws IOException; +} \ No newline at end of file diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/BufferedDataBlockEncoder.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/BufferedDataBlockEncoder.java index 55fd811b52e..62e81ab7f4d 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/BufferedDataBlockEncoder.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/BufferedDataBlockEncoder.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.io.encoding; import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; +import java.io.OutputStream; import java.nio.ByteBuffer; import org.apache.hadoop.hbase.Cell; @@ -28,6 +29,7 @@ import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValue.Type; import org.apache.hadoop.hbase.KeyValueUtil; +import org.apache.hadoop.hbase.Streamable; import org.apache.hadoop.hbase.SettableSequenceId; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.io.HeapSize; @@ -35,6 +37,7 @@ import org.apache.hadoop.hbase.io.TagCompressionContext; import org.apache.hadoop.hbase.io.hfile.BlockType; import org.apache.hadoop.hbase.io.hfile.HFileContext; import org.apache.hadoop.hbase.io.util.LRUDictionary; +import org.apache.hadoop.hbase.io.util.StreamUtils; import org.apache.hadoop.hbase.util.ByteBufferUtils; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.ClassSize; @@ -345,7 +348,8 @@ abstract class BufferedDataBlockEncoder implements DataBlockEncoder { // there. So this has to be an instance of SettableSequenceId. SeekerState need not be // SettableSequenceId as we never return that to top layers. When we have to, we make // ClonedSeekerState from it. - protected static class ClonedSeekerState implements Cell, HeapSize, SettableSequenceId { + protected static class ClonedSeekerState implements Cell, HeapSize, SettableSequenceId, + Streamable { private static final long FIXED_OVERHEAD = ClassSize.align(ClassSize.OBJECT + (4 * ClassSize.REFERENCE) + (2 * Bytes.SIZEOF_LONG) + (7 * Bytes.SIZEOF_INT) + (Bytes.SIZEOF_SHORT) + (2 * Bytes.SIZEOF_BYTE) + (2 * ClassSize.ARRAY)); @@ -534,6 +538,40 @@ abstract class BufferedDataBlockEncoder implements DataBlockEncoder { public long heapSize() { return FIXED_OVERHEAD + rowLength + familyLength + qualifierLength + valueLength + tagsLength; } + + @Override + public int write(OutputStream out) throws IOException { + return write(out, true); + } + + @Override + public int write(OutputStream out, boolean withTags) throws IOException { + int lenToWrite = KeyValueUtil.length(rowLength, familyLength, qualifierLength, valueLength, + tagsLength, withTags); + StreamUtils.writeInt(out, lenToWrite); + StreamUtils.writeInt(out, keyOnlyBuffer.length); + StreamUtils.writeInt(out, valueLength); + // Write key + out.write(keyOnlyBuffer); + // Write value + assert this.currentBuffer.hasArray(); + out.write(this.currentBuffer.array(), this.currentBuffer.arrayOffset() + this.valueOffset, + this.valueLength); + if (withTags) { + // 2 bytes tags length followed by tags bytes + // tags length is serialized with 2 bytes only(short way) even if the type is int. + // As this is non -ve numbers, we save the sign bit. See HBASE-11437 + out.write((byte) (0xff & (this.tagsLength >> 8))); + out.write((byte) (0xff & this.tagsLength)); + if (this.tagCompressionContext != null) { + out.write(cloneTagsBuffer); + } else { + out.write(this.currentBuffer.array(), this.currentBuffer.arrayOffset() + this.tagsOffset, + this.tagsLength); + } + } + return lenToWrite + Bytes.SIZEOF_INT; + } } protected abstract static class