HBASE-11934 Support KeyValueCodec to encode non KeyValue cells. (Anoop Sam John)

This commit is contained in:
Enis Soztutar 2014-09-10 22:18:51 -07:00
parent 8c604e14a3
commit d4850f1f17
5 changed files with 88 additions and 15 deletions

View File

@ -38,6 +38,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hbase.io.HeapSize;
import org.apache.hadoop.hbase.io.util.StreamUtils;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ClassSize;
import org.apache.hadoop.io.IOUtils;
@ -2413,6 +2414,7 @@ public class KeyValue implements Cell, HeapSize, Cloneable, SettableSequenceId {
* @throws IOException
* @see #create(DataInput) for the inverse function
* @see #write(KeyValue, DataOutput)
* @deprecated use {@link #oswrite(KeyValue, OutputStream, boolean)} instead
*/
@Deprecated
public static long oswrite(final KeyValue kv, final OutputStream out)
@ -2435,15 +2437,18 @@ public class KeyValue implements Cell, HeapSize, Cloneable, SettableSequenceId {
* @throws IOException
* @see #create(DataInput) for the inverse function
* @see #write(KeyValue, DataOutput)
* @see KeyValueUtil#oswrite(Cell, OutputStream, boolean)
*/
public static long oswrite(final KeyValue kv, final OutputStream out, final boolean withTags)
throws IOException {
// In KeyValueUtil#oswrite we do a Cell serialization as KeyValue. Any changes doing here, pls
// check KeyValueUtil#oswrite also and do necessary changes.
int length = kv.getLength();
if (!withTags) {
length = kv.getKeyLength() + kv.getValueLength() + KEYVALUE_INFRASTRUCTURE_SIZE;
}
// This does same as DataOuput#writeInt (big-endian, etc.)
out.write(Bytes.toBytes(length));
StreamUtils.writeInt(out, length);
out.write(kv.getBuffer(), kv.getOffset(), length);
return length + Bytes.SIZEOF_INT;
}

View File

@ -18,12 +18,15 @@
package org.apache.hadoop.hbase;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hbase.KeyValue.Type;
import org.apache.hadoop.hbase.io.util.StreamUtils;
import org.apache.hadoop.hbase.util.ByteBufferUtils;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.IterableUtils;
@ -42,14 +45,23 @@ public class KeyValueUtil {
/**************** length *********************/
public static int length(final Cell cell) {
return (int) (KeyValue.getKeyValueDataStructureSize(cell.getRowLength(),
cell.getFamilyLength(), cell.getQualifierLength(), cell.getValueLength(),
cell.getTagsLength()));
return length(cell.getRowLength(), cell.getFamilyLength(), cell.getQualifierLength(),
cell.getValueLength(), cell.getTagsLength(), true);
}
private static int length(short rlen, byte flen, int qlen, int vlen, int tlen, boolean withTags) {
if (withTags) {
return (int) (KeyValue.getKeyValueDataStructureSize(rlen, flen, qlen, vlen, tlen));
}
return (int) (KeyValue.getKeyValueDataStructureSize(rlen, flen, qlen, vlen));
}
protected static int keyLength(final Cell cell) {
return (int)KeyValue.getKeyDataStructureSize(cell.getRowLength(), cell.getFamilyLength(),
cell.getQualifierLength());
return keyLength(cell.getRowLength(), cell.getFamilyLength(), cell.getQualifierLength());
}
private static int keyLength(short rlen, byte flen, int qlen) {
return (int) KeyValue.getKeyDataStructureSize(rlen, flen, qlen);
}
public static int lengthWithMvccVersion(final KeyValue kv, final boolean includeMvccVersion) {
@ -514,4 +526,46 @@ public class KeyValueUtil {
return new ArrayList<KeyValue>(lazyList);
}
public static void oswrite(final Cell cell, final OutputStream out, final boolean withTags)
throws IOException {
if (cell instanceof KeyValue) {
KeyValue.oswrite((KeyValue) cell, out, withTags);
} else {
short rlen = cell.getRowLength();
byte flen = cell.getFamilyLength();
int qlen = cell.getQualifierLength();
int vlen = cell.getValueLength();
int tlen = cell.getTagsLength();
// write total length
StreamUtils.writeInt(out, length(rlen, flen, qlen, vlen, tlen, withTags));
// write key length
StreamUtils.writeInt(out, keyLength(rlen, flen, qlen));
// write value length
StreamUtils.writeInt(out, vlen);
// Write rowkey - 2 bytes rk length followed by rowkey bytes
StreamUtils.writeShort(out, rlen);
out.write(cell.getRowArray(), cell.getRowOffset(), rlen);
// Write cf - 1 byte of cf length followed by the family bytes
out.write(flen);
out.write(cell.getFamilyArray(), cell.getFamilyOffset(), flen);
// write qualifier
out.write(cell.getQualifierArray(), cell.getQualifierOffset(), qlen);
// write timestamp
StreamUtils.writeLong(out, cell.getTimestamp());
// write the type
out.write(cell.getTypeByte());
// write value
out.write(cell.getValueArray(), cell.getValueOffset(), vlen);
// write tags if we have to
if (withTags) {
// 2 bytes tags length followed by tags bytes
// tags length is serialized with 2 bytes only(short way) even if the type is int. As this
// is non -ve numbers, we save the sign bit. See HBASE-11437
out.write((byte) (0xff & (tlen >> 8)));
out.write((byte) (0xff & tlen));
out.write(cell.getTagsArray(), cell.getTagsOffset(), tlen);
}
}
}
}

View File

@ -30,7 +30,7 @@ import org.apache.hadoop.hbase.KeyValueUtil;
/**
* Codec that does KeyValue version 1 serialization.
*
* <p>Encodes by casting Cell to KeyValue and writing out the backing array with a length prefix.
* <p>Encodes Cell as serialized in KeyValue with total length prefix.
* This is how KVs were serialized in Puts, Deletes and Results pre-0.96. Its what would
* happen if you called the Writable#write KeyValue implementation. This encoder will fail
* if the passed Cell is not an old-school pre-0.96 KeyValue. Does not copy bytes writing.
@ -54,10 +54,8 @@ public class KeyValueCodec implements Codec {
@Override
public void write(Cell cell) throws IOException {
checkFlushed();
// This is crass and will not work when KV changes. Also if passed a non-kv Cell, it will
// make expensive copy.
// Do not write tags over RPC
KeyValue.oswrite((KeyValue) KeyValueUtil.ensureKeyValue(cell), this.out, false);
KeyValueUtil.oswrite(cell, out, false);
}
}

View File

@ -31,7 +31,7 @@ import org.apache.hadoop.hbase.KeyValueUtil;
* Codec that does KeyValue version 1 serialization with serializing tags also.
*
* <p>
* Encodes by casting Cell to KeyValue and writing out the backing array with a length prefix. This
* Encodes Cell as serialized in KeyValue with total length prefix. This
* is how KVs were serialized in Puts, Deletes and Results pre-0.96. Its what would happen if you
* called the Writable#write KeyValue implementation. This encoder will fail if the passed Cell is
* not an old-school pre-0.96 KeyValue. Does not copy bytes writing. It just writes them direct to
@ -47,7 +47,7 @@ import org.apache.hadoop.hbase.KeyValueUtil;
* KeyValue2 backing array
* </pre>
*
* Note: The only difference of this with KeyValueCodec is the latter ignores tags in KeyValues.
* Note: The only difference of this with KeyValueCodec is the latter ignores tags in Cells.
* <b>Use this Codec only at server side.</b>
*/
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
@ -60,10 +60,8 @@ public class KeyValueCodecWithTags implements Codec {
@Override
public void write(Cell cell) throws IOException {
checkFlushed();
// This is crass and will not work when KV changes. Also if passed a non-kv Cell, it will
// make expensive copy.
// Write tags
KeyValue.oswrite((KeyValue) KeyValueUtil.ensureKeyValue(cell), this.out, true);
KeyValueUtil.oswrite(cell, out, true);
}
}

View File

@ -180,4 +180,22 @@ public class StreamUtils {
out.write((byte) (0xff & (v >> 8)));
out.write((byte) (0xff & v));
}
public static void writeInt(OutputStream out, int v) throws IOException {
out.write((byte) (0xff & (v >> 24)));
out.write((byte) (0xff & (v >> 16)));
out.write((byte) (0xff & (v >> 8)));
out.write((byte) (0xff & v));
}
public static void writeLong(OutputStream out, long v) throws IOException {
out.write((byte) (0xff & (v >> 56)));
out.write((byte) (0xff & (v >> 48)));
out.write((byte) (0xff & (v >> 40)));
out.write((byte) (0xff & (v >> 32)));
out.write((byte) (0xff & (v >> 24)));
out.write((byte) (0xff & (v >> 16)));
out.write((byte) (0xff & (v >> 8)));
out.write((byte) (0xff & v));
}
}