HBASE-13754 Allow non KeyValue Cell types also to oswrite.

This commit is contained in:
anoopsjohn 2015-05-28 11:12:37 +05:30
parent 353b046d6c
commit 7de4881e3a
5 changed files with 122 additions and 30 deletions

View File

@ -79,7 +79,8 @@ import com.google.common.annotations.VisibleForTesting;
* and actual tag bytes length.
*/
@InterfaceAudience.Private
public class KeyValue implements Cell, HeapSize, Cloneable, SettableSequenceId, SettableTimestamp {
public class KeyValue implements Cell, HeapSize, Cloneable, SettableSequenceId,
SettableTimestamp, Streamable {
private static final ArrayList<Tag> EMPTY_ARRAY_LIST = new ArrayList<Tag>();
private static final Log LOG = LogFactory.getLog(KeyValue.class);
@ -2495,28 +2496,6 @@ public class KeyValue implements Cell, HeapSize, Cloneable, SettableSequenceId,
return length + Bytes.SIZEOF_INT;
}
/**
* Write out a KeyValue in the manner in which we used to when KeyValue was a Writable but do
* not require a {@link DataOutput}, just take plain {@link OutputStream}
* Named <code>oswrite</code> so does not clash with {@link #write(KeyValue, DataOutput)}
* @param kv
* @param out
* @return Length written on stream
* @throws IOException
* @see #create(DataInput) for the inverse function
* @see #write(KeyValue, DataOutput)
* @deprecated use {@link #oswrite(KeyValue, OutputStream, boolean)} instead
*/
@Deprecated
public static long oswrite(final KeyValue kv, final OutputStream out)
throws IOException {
int length = kv.getLength();
// This does same as DataOuput#writeInt (big-endian, etc.)
out.write(Bytes.toBytes(length));
out.write(kv.getBuffer(), kv.getOffset(), length);
return length + Bytes.SIZEOF_INT;
}
/**
* Write out a KeyValue in the manner in which we used to when KeyValue was a Writable but do
* not require a {@link DataOutput}, just take plain {@link OutputStream}
@ -2529,18 +2508,31 @@ public class KeyValue implements Cell, HeapSize, Cloneable, SettableSequenceId,
* @see #create(DataInput) for the inverse function
* @see #write(KeyValue, DataOutput)
* @see KeyValueUtil#oswrite(Cell, OutputStream, boolean)
* @deprecated As of release 2.0.0, this will be removed in HBase 3.0.0.
* Instead use {@link #write(OutputStream, boolean)}
*/
@Deprecated
public static long oswrite(final KeyValue kv, final OutputStream out, final boolean withTags)
throws IOException {
return kv.write(out, withTags);
}
@Override
public int write(OutputStream out) throws IOException {
return write(out, true);
}
@Override
public int write(OutputStream out, boolean withTags) throws IOException {
// In KeyValueUtil#oswrite we do a Cell serialization as KeyValue. Any changes doing here, pls
// check KeyValueUtil#oswrite also and do necessary changes.
int length = kv.getLength();
int length = this.length;
if (!withTags) {
length = kv.getKeyLength() + kv.getValueLength() + KEYVALUE_INFRASTRUCTURE_SIZE;
length = this.getKeyLength() + this.getValueLength() + KEYVALUE_INFRASTRUCTURE_SIZE;
}
// This does same as DataOuput#writeInt (big-endian, etc.)
StreamUtils.writeInt(out, length);
out.write(kv.getBuffer(), kv.getOffset(), length);
out.write(this.bytes, this.offset, length);
return length + Bytes.SIZEOF_INT;
}

View File

@ -58,7 +58,7 @@ public class KeyValueUtil {
cell.getValueLength(), cell.getTagsLength(), true);
}
private static int length(short rlen, byte flen, int qlen, int vlen, int tlen, boolean withTags) {
public static int length(short rlen, byte flen, int qlen, int vlen, int tlen, boolean withTags) {
if (withTags) {
return (int) (KeyValue.getKeyValueDataStructureSize(rlen, flen, qlen, vlen, tlen));
}
@ -669,8 +669,8 @@ public class KeyValueUtil {
public static void oswrite(final Cell cell, final OutputStream out, final boolean withTags)
throws IOException {
if (cell instanceof KeyValue) {
KeyValue.oswrite((KeyValue) cell, out, withTags);
if (cell instanceof Streamable) {
((Streamable)cell).write(out, withTags);
} else {
short rlen = cell.getRowLength();
byte flen = cell.getFamilyLength();

View File

@ -19,7 +19,12 @@
*/
package org.apache.hadoop.hbase;
import java.io.IOException;
import java.io.OutputStream;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.io.util.StreamUtils;
import org.apache.hadoop.hbase.util.Bytes;
/**
* An extension of the KeyValue where the tags length is always 0
@ -34,4 +39,14 @@ public class NoTagsKeyValue extends KeyValue {
public int getTagsLength() {
return 0;
}
@Override
public int write(OutputStream out, boolean withTags) throws IOException {
// In KeyValueUtil#oswrite we do a Cell serialization as KeyValue. Any changes doing here, pls
// check KeyValueUtil#oswrite also and do necessary changes.
// This does same as DataOuput#writeInt (big-endian, etc.)
StreamUtils.writeInt(out, this.length);
out.write(this.bytes, this.offset, this.length);
return this.length + Bytes.SIZEOF_INT;
}
}

View File

@ -0,0 +1,47 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase;
import java.io.IOException;
import java.io.OutputStream;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
/**
* This marks a Cell as streamable to a given OutputStream.
*/
@InterfaceAudience.Private
public interface Streamable {
/**
* Write this cell to an OutputStream.
* @param out Stream to which cell has to be written
* @return how many bytes are written.
* @throws IOException
*/
int write(OutputStream out) throws IOException;
/**
* Write this cell to an OutputStream.
* @param out Stream to which cell has to be written
* @param withTags Whether to write tags.
* @return how many bytes are written.
* @throws IOException
*/
int write(OutputStream out, boolean withTags) throws IOException;
}

View File

@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.io.encoding;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import org.apache.hadoop.hbase.Cell;
@ -28,6 +29,7 @@ import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValue.Type;
import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.Streamable;
import org.apache.hadoop.hbase.SettableSequenceId;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.io.HeapSize;
@ -35,6 +37,7 @@ import org.apache.hadoop.hbase.io.TagCompressionContext;
import org.apache.hadoop.hbase.io.hfile.BlockType;
import org.apache.hadoop.hbase.io.hfile.HFileContext;
import org.apache.hadoop.hbase.io.util.LRUDictionary;
import org.apache.hadoop.hbase.io.util.StreamUtils;
import org.apache.hadoop.hbase.util.ByteBufferUtils;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ClassSize;
@ -345,7 +348,8 @@ abstract class BufferedDataBlockEncoder implements DataBlockEncoder {
// there. So this has to be an instance of SettableSequenceId. SeekerState need not be
// SettableSequenceId as we never return that to top layers. When we have to, we make
// ClonedSeekerState from it.
protected static class ClonedSeekerState implements Cell, HeapSize, SettableSequenceId {
protected static class ClonedSeekerState implements Cell, HeapSize, SettableSequenceId,
Streamable {
private static final long FIXED_OVERHEAD = ClassSize.align(ClassSize.OBJECT
+ (4 * ClassSize.REFERENCE) + (2 * Bytes.SIZEOF_LONG) + (7 * Bytes.SIZEOF_INT)
+ (Bytes.SIZEOF_SHORT) + (2 * Bytes.SIZEOF_BYTE) + (2 * ClassSize.ARRAY));
@ -534,6 +538,40 @@ abstract class BufferedDataBlockEncoder implements DataBlockEncoder {
public long heapSize() {
return FIXED_OVERHEAD + rowLength + familyLength + qualifierLength + valueLength + tagsLength;
}
@Override
public int write(OutputStream out) throws IOException {
return write(out, true);
}
@Override
public int write(OutputStream out, boolean withTags) throws IOException {
int lenToWrite = KeyValueUtil.length(rowLength, familyLength, qualifierLength, valueLength,
tagsLength, withTags);
StreamUtils.writeInt(out, lenToWrite);
StreamUtils.writeInt(out, keyOnlyBuffer.length);
StreamUtils.writeInt(out, valueLength);
// Write key
out.write(keyOnlyBuffer);
// Write value
assert this.currentBuffer.hasArray();
out.write(this.currentBuffer.array(), this.currentBuffer.arrayOffset() + this.valueOffset,
this.valueLength);
if (withTags) {
// 2 bytes tags length followed by tags bytes
// tags length is serialized with 2 bytes only(short way) even if the type is int.
// As this is non -ve numbers, we save the sign bit. See HBASE-11437
out.write((byte) (0xff & (this.tagsLength >> 8)));
out.write((byte) (0xff & this.tagsLength));
if (this.tagCompressionContext != null) {
out.write(cloneTagsBuffer);
} else {
out.write(this.currentBuffer.array(), this.currentBuffer.arrayOffset() + this.tagsOffset,
this.tagsLength);
}
}
return lenToWrite + Bytes.SIZEOF_INT;
}
}
protected abstract static class