HBASE-11874 Support Cell to be passed to StoreFile.Writer rather than KeyValue.
This commit is contained in:
parent
7547426705
commit
01cabe1c05
|
@ -18,6 +18,7 @@
|
|||
|
||||
package org.apache.hadoop.hbase;
|
||||
|
||||
import java.io.DataOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.Iterator;
|
||||
|
@ -29,6 +30,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
|
|||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.apache.hadoop.hbase.KeyValue.Type;
|
||||
import org.apache.hadoop.hbase.io.HeapSize;
|
||||
import org.apache.hadoop.hbase.util.ByteBufferUtils;
|
||||
import org.apache.hadoop.hbase.util.ByteRange;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
|
||||
|
@ -560,4 +562,126 @@ public final class CellUtil {
|
|||
return cell.getRowLength() + cell.getFamilyLength() + cell.getQualifierLength()
|
||||
+ cell.getValueLength() + cell.getTagsLength() + KeyValue.TIMESTAMP_TYPE_SIZE;
|
||||
}
|
||||
|
||||
/**
|
||||
* Writes the Cell's key part as it would have serialized in a KeyValue. The format is <2 bytes
|
||||
* rk len><rk><1 byte cf len><cf><qualifier><8 bytes
|
||||
* timestamp><1 byte type>
|
||||
* @param cell
|
||||
* @param out
|
||||
* @throws IOException
|
||||
*/
|
||||
public static void writeFlatKey(Cell cell, DataOutputStream out) throws IOException {
|
||||
short rowLen = cell.getRowLength();
|
||||
out.writeShort(rowLen);
|
||||
out.write(cell.getRowArray(), cell.getRowOffset(), rowLen);
|
||||
byte fLen = cell.getFamilyLength();
|
||||
out.writeByte(fLen);
|
||||
out.write(cell.getFamilyArray(), cell.getFamilyOffset(), fLen);
|
||||
out.write(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength());
|
||||
out.writeLong(cell.getTimestamp());
|
||||
out.writeByte(cell.getTypeByte());
|
||||
}
|
||||
|
||||
/**
|
||||
* Write rowkey excluding the common part.
|
||||
* @param cell
|
||||
* @param rLen
|
||||
* @param commonPrefix
|
||||
* @param out
|
||||
* @throws IOException
|
||||
*/
|
||||
public static void writeRowKeyExcludingCommon(Cell cell, short rLen, int commonPrefix,
|
||||
DataOutputStream out) throws IOException {
|
||||
if (commonPrefix == 0) {
|
||||
out.writeShort(rLen);
|
||||
} else if (commonPrefix == 1) {
|
||||
out.writeByte((byte) rLen);
|
||||
commonPrefix--;
|
||||
} else {
|
||||
commonPrefix -= KeyValue.ROW_LENGTH_SIZE;
|
||||
}
|
||||
if (rLen > commonPrefix) {
|
||||
out.write(cell.getRowArray(), cell.getRowOffset() + commonPrefix, rLen - commonPrefix);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Find length of common prefix in keys of the cells, considering key as byte[] if serialized in
|
||||
* {@link KeyValue}. The key format is <2 bytes rk len><rk><1 byte cf
|
||||
* len><cf><qualifier><8 bytes timestamp><1 byte type>
|
||||
* @param c1
|
||||
* the cell
|
||||
* @param c2
|
||||
* the cell
|
||||
* @param bypassFamilyCheck
|
||||
* when true assume the family bytes same in both cells. Pass it as true when dealing
|
||||
* with Cells in same CF so as to avoid some checks
|
||||
* @param withTsType
|
||||
* when true check timestamp and type bytes also.
|
||||
* @return length of common prefix
|
||||
*/
|
||||
public static int findCommonPrefixInFlatKey(Cell c1, Cell c2, boolean bypassFamilyCheck,
|
||||
boolean withTsType) {
|
||||
// Compare the 2 bytes in RK length part
|
||||
short rLen1 = c1.getRowLength();
|
||||
short rLen2 = c2.getRowLength();
|
||||
int commonPrefix = KeyValue.ROW_LENGTH_SIZE;
|
||||
if (rLen1 != rLen2) {
|
||||
// early out when the RK length itself is not matching
|
||||
return ByteBufferUtils.findCommonPrefix(Bytes.toBytes(rLen1), 0, KeyValue.ROW_LENGTH_SIZE,
|
||||
Bytes.toBytes(rLen2), 0, KeyValue.ROW_LENGTH_SIZE);
|
||||
}
|
||||
// Compare the RKs
|
||||
int rkCommonPrefix = ByteBufferUtils.findCommonPrefix(c1.getRowArray(), c1.getRowOffset(),
|
||||
rLen1, c2.getRowArray(), c2.getRowOffset(), rLen2);
|
||||
commonPrefix += rkCommonPrefix;
|
||||
if (rkCommonPrefix != rLen1) {
|
||||
// Early out when RK is not fully matching.
|
||||
return commonPrefix;
|
||||
}
|
||||
// Compare 1 byte CF length part
|
||||
byte fLen1 = c1.getFamilyLength();
|
||||
if (bypassFamilyCheck) {
|
||||
// This flag will be true when caller is sure that the family will be same for both the cells
|
||||
// Just make commonPrefix to increment by the family part
|
||||
commonPrefix += KeyValue.FAMILY_LENGTH_SIZE + fLen1;
|
||||
} else {
|
||||
byte fLen2 = c2.getFamilyLength();
|
||||
if (fLen1 != fLen2) {
|
||||
// early out when the CF length itself is not matching
|
||||
return commonPrefix;
|
||||
}
|
||||
// CF lengths are same so there is one more byte common in key part
|
||||
commonPrefix += KeyValue.FAMILY_LENGTH_SIZE;
|
||||
// Compare the CF names
|
||||
int fCommonPrefix = ByteBufferUtils.findCommonPrefix(c1.getFamilyArray(),
|
||||
c1.getFamilyOffset(), fLen1, c2.getFamilyArray(), c2.getFamilyOffset(), fLen2);
|
||||
commonPrefix += fCommonPrefix;
|
||||
if (fCommonPrefix != fLen1) {
|
||||
return commonPrefix;
|
||||
}
|
||||
}
|
||||
// Compare the Qualifiers
|
||||
int qLen1 = c1.getQualifierLength();
|
||||
int qLen2 = c2.getQualifierLength();
|
||||
int qCommon = ByteBufferUtils.findCommonPrefix(c1.getQualifierArray(), c1.getQualifierOffset(),
|
||||
qLen1, c2.getQualifierArray(), c2.getQualifierOffset(), qLen2);
|
||||
commonPrefix += qCommon;
|
||||
if (!withTsType || Math.max(qLen1, qLen2) != qCommon) {
|
||||
return commonPrefix;
|
||||
}
|
||||
// Compare the timestamp parts
|
||||
int tsCommonPrefix = ByteBufferUtils.findCommonPrefix(Bytes.toBytes(c1.getTimestamp()), 0,
|
||||
KeyValue.TIMESTAMP_SIZE, Bytes.toBytes(c2.getTimestamp()), 0, KeyValue.TIMESTAMP_SIZE);
|
||||
commonPrefix += tsCommonPrefix;
|
||||
if (tsCommonPrefix != KeyValue.TIMESTAMP_SIZE) {
|
||||
return commonPrefix;
|
||||
}
|
||||
// Compare the type
|
||||
if (c1.getTypeByte() == c2.getTypeByte()) {
|
||||
commonPrefix += KeyValue.TYPE_SIZE;
|
||||
}
|
||||
return commonPrefix;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -44,6 +44,11 @@ public class KeyValueUtil {
|
|||
|
||||
/**************** length *********************/
|
||||
|
||||
/**
|
||||
* Returns number of bytes this cell would have been used if serialized as in {@link KeyValue}
|
||||
* @param cell
|
||||
* @return the length
|
||||
*/
|
||||
public static int length(final Cell cell) {
|
||||
return length(cell.getRowLength(), cell.getFamilyLength(), cell.getQualifierLength(),
|
||||
cell.getValueLength(), cell.getTagsLength(), true);
|
||||
|
@ -56,7 +61,13 @@ public class KeyValueUtil {
|
|||
return (int) (KeyValue.getKeyValueDataStructureSize(rlen, flen, qlen, vlen));
|
||||
}
|
||||
|
||||
protected static int keyLength(final Cell cell) {
|
||||
/**
|
||||
* Returns number of bytes this cell's key part would have been used if serialized as in
|
||||
* {@link KeyValue}. Key includes rowkey, family, qualifier, timestamp and type.
|
||||
* @param cell
|
||||
* @return the key length
|
||||
*/
|
||||
public static int keyLength(final Cell cell) {
|
||||
return keyLength(cell.getRowLength(), cell.getFamilyLength(), cell.getQualifierLength());
|
||||
}
|
||||
|
||||
|
@ -93,7 +104,7 @@ public class KeyValueUtil {
|
|||
|
||||
public static ByteBuffer copyKeyToNewByteBuffer(final Cell cell) {
|
||||
byte[] bytes = new byte[keyLength(cell)];
|
||||
appendKeyToByteArrayWithoutValue(cell, bytes, 0);
|
||||
appendKeyTo(cell, bytes, 0);
|
||||
ByteBuffer buffer = ByteBuffer.wrap(bytes);
|
||||
buffer.position(buffer.limit());//make it look as if each field were appended
|
||||
return buffer;
|
||||
|
@ -106,7 +117,7 @@ public class KeyValueUtil {
|
|||
return backingBytes;
|
||||
}
|
||||
|
||||
protected static int appendKeyToByteArrayWithoutValue(final Cell cell, final byte[] output,
|
||||
public static int appendKeyTo(final Cell cell, final byte[] output,
|
||||
final int offset) {
|
||||
int nextOffset = offset;
|
||||
nextOffset = Bytes.putShort(output, nextOffset, cell.getRowLength());
|
||||
|
@ -126,7 +137,7 @@ public class KeyValueUtil {
|
|||
int pos = offset;
|
||||
pos = Bytes.putInt(output, pos, keyLength(cell));
|
||||
pos = Bytes.putInt(output, pos, cell.getValueLength());
|
||||
pos = appendKeyToByteArrayWithoutValue(cell, output, pos);
|
||||
pos = appendKeyTo(cell, output, pos);
|
||||
pos = CellUtil.copyValueTo(cell, output, pos);
|
||||
if ((cell.getTagsLength() > 0)) {
|
||||
pos = Bytes.putAsShort(output, pos, cell.getTagsLength());
|
||||
|
|
|
@ -835,17 +835,17 @@ abstract class BufferedDataBlockEncoder implements DataBlockEncoder {
|
|||
}
|
||||
|
||||
/**
|
||||
* @param kv
|
||||
* @param cell
|
||||
* @param out
|
||||
* @param encodingCtx
|
||||
* @return unencoded size added
|
||||
* @throws IOException
|
||||
*/
|
||||
protected final int afterEncodingKeyValue(KeyValue kv, DataOutputStream out,
|
||||
protected final int afterEncodingKeyValue(Cell cell, DataOutputStream out,
|
||||
HFileBlockDefaultEncodingContext encodingCtx) throws IOException {
|
||||
int size = 0;
|
||||
if (encodingCtx.getHFileContext().isIncludesTags()) {
|
||||
int tagsLength = kv.getTagsLength();
|
||||
int tagsLength = cell.getTagsLength();
|
||||
ByteBufferUtils.putCompressedInt(out, tagsLength);
|
||||
// There are some tags to be written
|
||||
if (tagsLength > 0) {
|
||||
|
@ -854,16 +854,16 @@ abstract class BufferedDataBlockEncoder implements DataBlockEncoder {
|
|||
// the tags using Dictionary compression in such a case
|
||||
if (tagCompressionContext != null) {
|
||||
tagCompressionContext
|
||||
.compressTags(out, kv.getTagsArray(), kv.getTagsOffset(), tagsLength);
|
||||
.compressTags(out, cell.getTagsArray(), cell.getTagsOffset(), tagsLength);
|
||||
} else {
|
||||
out.write(kv.getTagsArray(), kv.getTagsOffset(), tagsLength);
|
||||
out.write(cell.getTagsArray(), cell.getTagsOffset(), tagsLength);
|
||||
}
|
||||
}
|
||||
size += tagsLength + KeyValue.TAGS_LENGTH_SIZE;
|
||||
}
|
||||
if (encodingCtx.getHFileContext().isIncludesMvcc()) {
|
||||
// Copy memstore timestamp from the byte buffer to the output stream.
|
||||
long memstoreTS = kv.getMvccVersion();
|
||||
long memstoreTS = cell.getSequenceId();
|
||||
WritableUtils.writeVLong(out, memstoreTS);
|
||||
// TODO use a writeVLong which returns the #bytes written so that 2 time parsing can be
|
||||
// avoided.
|
||||
|
@ -973,16 +973,16 @@ abstract class BufferedDataBlockEncoder implements DataBlockEncoder {
|
|||
}
|
||||
|
||||
@Override
|
||||
public int encode(KeyValue kv, HFileBlockEncodingContext encodingCtx, DataOutputStream out)
|
||||
public int encode(Cell cell, HFileBlockEncodingContext encodingCtx, DataOutputStream out)
|
||||
throws IOException {
|
||||
BufferedDataBlockEncodingState state = (BufferedDataBlockEncodingState) encodingCtx
|
||||
.getEncodingState();
|
||||
int encodedKvSize = internalEncode(kv, (HFileBlockDefaultEncodingContext) encodingCtx, out);
|
||||
int encodedKvSize = internalEncode(cell, (HFileBlockDefaultEncodingContext) encodingCtx, out);
|
||||
state.unencodedDataSizeWritten += encodedKvSize;
|
||||
return encodedKvSize;
|
||||
}
|
||||
|
||||
public abstract int internalEncode(KeyValue kv, HFileBlockDefaultEncodingContext encodingCtx,
|
||||
public abstract int internalEncode(Cell cell, HFileBlockDefaultEncodingContext encodingCtx,
|
||||
DataOutputStream out) throws IOException;
|
||||
|
||||
@Override
|
||||
|
|
|
@ -22,7 +22,10 @@ import java.io.IOException;
|
|||
import java.nio.ByteBuffer;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.Cell;
|
||||
import org.apache.hadoop.hbase.CellUtil;
|
||||
import org.apache.hadoop.hbase.KeyValue;
|
||||
import org.apache.hadoop.hbase.KeyValueUtil;
|
||||
import org.apache.hadoop.hbase.KeyValue.KVComparator;
|
||||
import org.apache.hadoop.hbase.util.ByteBufferUtils;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
|
@ -36,28 +39,28 @@ import org.apache.hadoop.io.WritableUtils;
|
|||
public class CopyKeyDataBlockEncoder extends BufferedDataBlockEncoder {
|
||||
|
||||
@Override
|
||||
public int internalEncode(KeyValue kv, HFileBlockDefaultEncodingContext encodingContext,
|
||||
public int internalEncode(Cell cell, HFileBlockDefaultEncodingContext encodingContext,
|
||||
DataOutputStream out) throws IOException {
|
||||
int klength = kv.getKeyLength();
|
||||
int vlength = kv.getValueLength();
|
||||
int klength = KeyValueUtil.keyLength(cell);
|
||||
int vlength = cell.getValueLength();
|
||||
|
||||
out.writeInt(klength);
|
||||
out.writeInt(vlength);
|
||||
out.write(kv.getBuffer(), kv.getKeyOffset(), klength);
|
||||
out.write(kv.getValueArray(), kv.getValueOffset(), vlength);
|
||||
CellUtil.writeFlatKey(cell, out);
|
||||
out.write(cell.getValueArray(), cell.getValueOffset(), vlength);
|
||||
int size = klength + vlength + KeyValue.KEYVALUE_INFRASTRUCTURE_SIZE;
|
||||
// Write the additional tag into the stream
|
||||
if (encodingContext.getHFileContext().isIncludesTags()) {
|
||||
int tagsLength = kv.getTagsLength();
|
||||
int tagsLength = cell.getTagsLength();
|
||||
out.writeShort(tagsLength);
|
||||
if (tagsLength > 0) {
|
||||
out.write(kv.getTagsArray(), kv.getTagsOffset(), tagsLength);
|
||||
out.write(cell.getTagsArray(), cell.getTagsOffset(), tagsLength);
|
||||
}
|
||||
size += tagsLength + KeyValue.TAGS_LENGTH_SIZE;
|
||||
}
|
||||
if (encodingContext.getHFileContext().isIncludesMvcc()) {
|
||||
WritableUtils.writeVLong(out, kv.getMvccVersion());
|
||||
size += WritableUtils.getVIntSize(kv.getMvccVersion());
|
||||
WritableUtils.writeVLong(out, cell.getSequenceId());
|
||||
size += WritableUtils.getVIntSize(cell.getSequenceId());
|
||||
}
|
||||
return size;
|
||||
}
|
||||
|
|
|
@ -23,7 +23,6 @@ import java.nio.ByteBuffer;
|
|||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.Cell;
|
||||
import org.apache.hadoop.hbase.KeyValue;
|
||||
import org.apache.hadoop.hbase.KeyValue.KVComparator;
|
||||
import org.apache.hadoop.hbase.io.hfile.HFileContext;
|
||||
|
||||
|
@ -53,13 +52,13 @@ public interface DataBlockEncoder {
|
|||
|
||||
/**
|
||||
* Encodes a KeyValue.
|
||||
* @param kv
|
||||
* @param cell
|
||||
* @param encodingCtx
|
||||
* @param out
|
||||
* @return unencoded kv size written
|
||||
* @throws IOException
|
||||
*/
|
||||
int encode(KeyValue kv, HFileBlockEncodingContext encodingCtx, DataOutputStream out)
|
||||
int encode(Cell cell, HFileBlockEncodingContext encodingCtx, DataOutputStream out)
|
||||
throws IOException;
|
||||
|
||||
/**
|
||||
|
|
|
@ -22,7 +22,10 @@ import java.io.IOException;
|
|||
import java.nio.ByteBuffer;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.Cell;
|
||||
import org.apache.hadoop.hbase.CellUtil;
|
||||
import org.apache.hadoop.hbase.KeyValue;
|
||||
import org.apache.hadoop.hbase.KeyValueUtil;
|
||||
import org.apache.hadoop.hbase.KeyValue.KVComparator;
|
||||
import org.apache.hadoop.hbase.util.ByteBufferUtils;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
|
@ -192,59 +195,55 @@ public class DiffKeyDeltaEncoder extends BufferedDataBlockEncoder {
|
|||
}
|
||||
|
||||
@Override
|
||||
public int internalEncode(KeyValue kv, HFileBlockDefaultEncodingContext encodingContext,
|
||||
public int internalEncode(Cell cell, HFileBlockDefaultEncodingContext encodingContext,
|
||||
DataOutputStream out) throws IOException {
|
||||
EncodingState state = encodingContext.getEncodingState();
|
||||
int size = compressSingleKeyValue(out, kv, state.prevKv);
|
||||
size += afterEncodingKeyValue(kv, out, encodingContext);
|
||||
state.prevKv = kv;
|
||||
int size = compressSingleKeyValue(out, cell, state.prevCell);
|
||||
size += afterEncodingKeyValue(cell, out, encodingContext);
|
||||
state.prevCell = cell;
|
||||
return size;
|
||||
}
|
||||
|
||||
private int compressSingleKeyValue(DataOutputStream out, KeyValue kv, KeyValue prevKv)
|
||||
private int compressSingleKeyValue(DataOutputStream out, Cell cell, Cell prevCell)
|
||||
throws IOException {
|
||||
byte flag = 0;
|
||||
int kLength = kv.getKeyLength();
|
||||
int vLength = kv.getValueLength();
|
||||
int kLength = KeyValueUtil.keyLength(cell);
|
||||
int vLength = cell.getValueLength();
|
||||
|
||||
long timestamp;
|
||||
long diffTimestamp = 0;
|
||||
int diffTimestampFitsInBytes = 0;
|
||||
int timestampFitsInBytes;
|
||||
int commonPrefix;
|
||||
byte[] curKvBuf = kv.getBuffer();
|
||||
int commonPrefix = 0;
|
||||
|
||||
if (prevKv == null) {
|
||||
timestamp = kv.getTimestamp();
|
||||
if (prevCell == null) {
|
||||
timestamp = cell.getTimestamp();
|
||||
if (timestamp < 0) {
|
||||
flag |= FLAG_TIMESTAMP_SIGN;
|
||||
timestamp = -timestamp;
|
||||
}
|
||||
timestampFitsInBytes = ByteBufferUtils.longFitsIn(timestamp);
|
||||
flag |= (timestampFitsInBytes - 1) << SHIFT_TIMESTAMP_LENGTH;
|
||||
commonPrefix = 0;
|
||||
// put column family
|
||||
byte familyLength = kv.getFamilyLength();
|
||||
byte familyLength = cell.getFamilyLength();
|
||||
out.write(familyLength);
|
||||
out.write(kv.getFamilyArray(), kv.getFamilyOffset(), familyLength);
|
||||
out.write(cell.getFamilyArray(), cell.getFamilyOffset(), familyLength);
|
||||
} else {
|
||||
// Finding common prefix
|
||||
int preKeyLength = prevKv.getKeyLength();
|
||||
commonPrefix = ByteBufferUtils.findCommonPrefix(curKvBuf, kv.getKeyOffset(), kLength
|
||||
- KeyValue.TIMESTAMP_TYPE_SIZE, prevKv.getBuffer(), prevKv.getKeyOffset(), preKeyLength
|
||||
- KeyValue.TIMESTAMP_TYPE_SIZE);
|
||||
int preKeyLength = KeyValueUtil.keyLength(prevCell);
|
||||
commonPrefix = CellUtil.findCommonPrefixInFlatKey(cell, prevCell, true, false);
|
||||
if (kLength == preKeyLength) {
|
||||
flag |= FLAG_SAME_KEY_LENGTH;
|
||||
}
|
||||
if (vLength == prevKv.getValueLength()) {
|
||||
if (vLength == prevCell.getValueLength()) {
|
||||
flag |= FLAG_SAME_VALUE_LENGTH;
|
||||
}
|
||||
if (kv.getTypeByte() == prevKv.getTypeByte()) {
|
||||
if (cell.getTypeByte() == prevCell.getTypeByte()) {
|
||||
flag |= FLAG_SAME_TYPE;
|
||||
}
|
||||
// don't compress timestamp and type using prefix encode timestamp
|
||||
timestamp = kv.getTimestamp();
|
||||
diffTimestamp = prevKv.getTimestamp() - timestamp;
|
||||
timestamp = cell.getTimestamp();
|
||||
diffTimestamp = prevCell.getTimestamp() - timestamp;
|
||||
boolean negativeTimestamp = timestamp < 0;
|
||||
if (negativeTimestamp) {
|
||||
timestamp = -timestamp;
|
||||
|
@ -276,13 +275,21 @@ public class DiffKeyDeltaEncoder extends BufferedDataBlockEncoder {
|
|||
ByteBufferUtils.putCompressedInt(out, vLength);
|
||||
}
|
||||
ByteBufferUtils.putCompressedInt(out, commonPrefix);
|
||||
if (prevKv == null || commonPrefix < kv.getRowLength() + KeyValue.ROW_LENGTH_SIZE) {
|
||||
int restRowLength = kv.getRowLength() + KeyValue.ROW_LENGTH_SIZE - commonPrefix;
|
||||
out.write(curKvBuf, kv.getKeyOffset() + commonPrefix, restRowLength);
|
||||
out.write(curKvBuf, kv.getQualifierOffset(), kv.getQualifierLength());
|
||||
short rLen = cell.getRowLength();
|
||||
if (commonPrefix < rLen + KeyValue.ROW_LENGTH_SIZE) {
|
||||
// Previous and current rows are different. Copy the differing part of
|
||||
// the row, skip the column family, and copy the qualifier.
|
||||
CellUtil.writeRowKeyExcludingCommon(cell, rLen, commonPrefix, out);
|
||||
out.write(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength());
|
||||
} else {
|
||||
out.write(curKvBuf, kv.getKeyOffset() + commonPrefix, kLength - commonPrefix
|
||||
- KeyValue.TIMESTAMP_TYPE_SIZE);
|
||||
// The common part includes the whole row. As the column family is the
|
||||
// same across the whole file, it will automatically be included in the
|
||||
// common prefix, so we need not special-case it here.
|
||||
// What we write here is the non common part of the qualifier
|
||||
int commonQualPrefix = commonPrefix - (rLen + KeyValue.ROW_LENGTH_SIZE)
|
||||
- (cell.getFamilyLength() + KeyValue.FAMILY_LENGTH_SIZE);
|
||||
out.write(cell.getQualifierArray(), cell.getQualifierOffset() + commonQualPrefix,
|
||||
cell.getQualifierLength() - commonQualPrefix);
|
||||
}
|
||||
if ((flag & FLAG_TIMESTAMP_IS_DIFF) == 0) {
|
||||
ByteBufferUtils.putLong(out, timestamp, timestampFitsInBytes);
|
||||
|
@ -291,9 +298,9 @@ public class DiffKeyDeltaEncoder extends BufferedDataBlockEncoder {
|
|||
}
|
||||
|
||||
if ((flag & FLAG_SAME_TYPE) == 0) {
|
||||
out.write(kv.getTypeByte());
|
||||
out.write(cell.getTypeByte());
|
||||
}
|
||||
out.write(kv.getValueArray(), kv.getValueOffset(), vLength);
|
||||
out.write(cell.getValueArray(), cell.getValueOffset(), vLength);
|
||||
return kLength + vLength + KeyValue.KEYVALUE_INFRASTRUCTURE_SIZE;
|
||||
}
|
||||
|
||||
|
|
|
@ -19,7 +19,7 @@
|
|||
package org.apache.hadoop.hbase.io.encoding;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.KeyValue;
|
||||
import org.apache.hadoop.hbase.Cell;
|
||||
|
||||
/**
|
||||
* Keeps track of the encoding state.
|
||||
|
@ -28,7 +28,7 @@ import org.apache.hadoop.hbase.KeyValue;
|
|||
public class EncodingState {
|
||||
|
||||
/**
|
||||
* The previous KeyValue the encoder encoded.
|
||||
* The previous Cell the encoder encoded.
|
||||
*/
|
||||
protected KeyValue prevKv = null;
|
||||
protected Cell prevCell = null;
|
||||
}
|
||||
|
|
|
@ -22,7 +22,10 @@ import java.io.IOException;
|
|||
import java.nio.ByteBuffer;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.Cell;
|
||||
import org.apache.hadoop.hbase.CellUtil;
|
||||
import org.apache.hadoop.hbase.KeyValue;
|
||||
import org.apache.hadoop.hbase.KeyValueUtil;
|
||||
import org.apache.hadoop.hbase.KeyValue.KVComparator;
|
||||
import org.apache.hadoop.hbase.util.ByteBufferUtils;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
|
@ -101,11 +104,10 @@ public class FastDiffDeltaEncoder extends BufferedDataBlockEncoder {
|
|||
|
||||
}
|
||||
|
||||
private int findCommonTimestampPrefix(byte[] curKvBuf, int curKvTsOff, byte[] preKvBuf,
|
||||
int preKvTsOff) {
|
||||
private int findCommonTimestampPrefix(byte[] curTsBuf, byte[] prevTsBuf) {
|
||||
int commonPrefix = 0;
|
||||
while (commonPrefix < (KeyValue.TIMESTAMP_SIZE - 1)
|
||||
&& curKvBuf[curKvTsOff + commonPrefix] == preKvBuf[preKvTsOff + commonPrefix]) {
|
||||
&& curTsBuf[commonPrefix] == prevTsBuf[commonPrefix]) {
|
||||
commonPrefix++;
|
||||
}
|
||||
return commonPrefix; // has to be at most 7 bytes
|
||||
|
@ -237,59 +239,57 @@ public class FastDiffDeltaEncoder extends BufferedDataBlockEncoder {
|
|||
}
|
||||
|
||||
@Override
|
||||
public int internalEncode(KeyValue kv, HFileBlockDefaultEncodingContext encodingContext,
|
||||
public int internalEncode(Cell cell, HFileBlockDefaultEncodingContext encodingContext,
|
||||
DataOutputStream out) throws IOException {
|
||||
EncodingState state = encodingContext.getEncodingState();
|
||||
int size = compressSingleKeyValue(out, kv, state.prevKv);
|
||||
size += afterEncodingKeyValue(kv, out, encodingContext);
|
||||
state.prevKv = kv;
|
||||
int size = compressSingleKeyValue(out, cell, state.prevCell);
|
||||
size += afterEncodingKeyValue(cell, out, encodingContext);
|
||||
state.prevCell = cell;
|
||||
return size;
|
||||
}
|
||||
|
||||
private int compressSingleKeyValue(DataOutputStream out, KeyValue kv, KeyValue prevKv)
|
||||
private int compressSingleKeyValue(DataOutputStream out, Cell cell, Cell prevCell)
|
||||
throws IOException {
|
||||
byte flag = 0;
|
||||
int kLength = kv.getKeyLength();
|
||||
int vLength = kv.getValueLength();
|
||||
byte[] curKvBuf = kv.getBuffer();
|
||||
int kLength = KeyValueUtil.keyLength(cell);
|
||||
int vLength = cell.getValueLength();
|
||||
|
||||
if (prevKv == null) {
|
||||
if (prevCell == null) {
|
||||
// copy the key, there is no common prefix with none
|
||||
out.write(flag);
|
||||
ByteBufferUtils.putCompressedInt(out, kLength);
|
||||
ByteBufferUtils.putCompressedInt(out, vLength);
|
||||
ByteBufferUtils.putCompressedInt(out, 0);
|
||||
out.write(curKvBuf, kv.getKeyOffset(), kLength + vLength);
|
||||
CellUtil.writeFlatKey(cell, out);
|
||||
// Write the value part
|
||||
out.write(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
|
||||
} else {
|
||||
byte[] preKvBuf = prevKv.getBuffer();
|
||||
int preKeyLength = prevKv.getKeyLength();
|
||||
int preValLength = prevKv.getValueLength();
|
||||
int preKeyLength = KeyValueUtil.keyLength(prevCell);
|
||||
int preValLength = prevCell.getValueLength();
|
||||
// find a common prefix and skip it
|
||||
int commonPrefix = ByteBufferUtils.findCommonPrefix(curKvBuf, kv.getKeyOffset(), kLength
|
||||
- KeyValue.TIMESTAMP_TYPE_SIZE, preKvBuf, prevKv.getKeyOffset(), preKeyLength
|
||||
- KeyValue.TIMESTAMP_TYPE_SIZE);
|
||||
int commonPrefix = CellUtil.findCommonPrefixInFlatKey(cell, prevCell, true, false);
|
||||
|
||||
if (kLength == prevKv.getKeyLength()) {
|
||||
if (kLength == preKeyLength) {
|
||||
flag |= FLAG_SAME_KEY_LENGTH;
|
||||
}
|
||||
if (vLength == prevKv.getValueLength()) {
|
||||
if (vLength == prevCell.getValueLength()) {
|
||||
flag |= FLAG_SAME_VALUE_LENGTH;
|
||||
}
|
||||
if (kv.getTypeByte() == prevKv.getTypeByte()) {
|
||||
if (cell.getTypeByte() == prevCell.getTypeByte()) {
|
||||
flag |= FLAG_SAME_TYPE;
|
||||
}
|
||||
|
||||
int commonTimestampPrefix = findCommonTimestampPrefix(curKvBuf, kv.getKeyOffset() + kLength
|
||||
- KeyValue.TIMESTAMP_TYPE_SIZE, preKvBuf, prevKv.getKeyOffset() + preKeyLength
|
||||
- KeyValue.TIMESTAMP_TYPE_SIZE);
|
||||
byte[] curTsBuf = Bytes.toBytes(cell.getTimestamp());
|
||||
int commonTimestampPrefix = findCommonTimestampPrefix(curTsBuf,
|
||||
Bytes.toBytes(prevCell.getTimestamp()));
|
||||
|
||||
flag |= commonTimestampPrefix << SHIFT_TIMESTAMP_LENGTH;
|
||||
|
||||
// Check if current and previous values are the same. Compare value
|
||||
// length first as an optimization.
|
||||
if (vLength == preValLength
|
||||
&& Bytes.equals(kv.getValueArray(), kv.getValueOffset(), vLength,
|
||||
prevKv.getValueArray(), prevKv.getValueOffset(), preValLength)) {
|
||||
&& Bytes.equals(cell.getValueArray(), cell.getValueOffset(), vLength,
|
||||
prevCell.getValueArray(), prevCell.getValueOffset(), preValLength)) {
|
||||
flag |= FLAG_SAME_VALUE;
|
||||
}
|
||||
|
||||
|
@ -301,31 +301,33 @@ public class FastDiffDeltaEncoder extends BufferedDataBlockEncoder {
|
|||
ByteBufferUtils.putCompressedInt(out, vLength);
|
||||
}
|
||||
ByteBufferUtils.putCompressedInt(out, commonPrefix);
|
||||
|
||||
if (commonPrefix < kv.getRowLength() + KeyValue.ROW_LENGTH_SIZE) {
|
||||
short rLen = cell.getRowLength();
|
||||
if (commonPrefix < rLen + KeyValue.ROW_LENGTH_SIZE) {
|
||||
// Previous and current rows are different. Copy the differing part of
|
||||
// the row, skip the column family, and copy the qualifier.
|
||||
out.write(curKvBuf, kv.getKeyOffset() + commonPrefix, kv.getRowLength()
|
||||
+ KeyValue.ROW_LENGTH_SIZE - commonPrefix);
|
||||
out.write(curKvBuf, kv.getQualifierOffset(), kv.getQualifierLength());
|
||||
CellUtil.writeRowKeyExcludingCommon(cell, rLen, commonPrefix, out);
|
||||
out.write(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength());
|
||||
} else {
|
||||
// The common part includes the whole row. As the column family is the
|
||||
// same across the whole file, it will automatically be included in the
|
||||
// common prefix, so we need not special-case it here.
|
||||
int restKeyLength = kLength - commonPrefix - KeyValue.TIMESTAMP_TYPE_SIZE;
|
||||
out.write(curKvBuf, kv.getKeyOffset() + commonPrefix, restKeyLength);
|
||||
// What we write here is the non common part of the qualifier
|
||||
int commonQualPrefix = commonPrefix - (rLen + KeyValue.ROW_LENGTH_SIZE)
|
||||
- (cell.getFamilyLength() + KeyValue.FAMILY_LENGTH_SIZE);
|
||||
out.write(cell.getQualifierArray(), cell.getQualifierOffset() + commonQualPrefix,
|
||||
cell.getQualifierLength() - commonQualPrefix);
|
||||
}
|
||||
out.write(curKvBuf, kv.getKeyOffset() + kLength - KeyValue.TIMESTAMP_TYPE_SIZE
|
||||
+ commonTimestampPrefix, KeyValue.TIMESTAMP_SIZE - commonTimestampPrefix);
|
||||
// Write non common ts part
|
||||
out.write(curTsBuf, commonTimestampPrefix, KeyValue.TIMESTAMP_SIZE - commonTimestampPrefix);
|
||||
|
||||
// Write the type if it is not the same as before.
|
||||
if ((flag & FLAG_SAME_TYPE) == 0) {
|
||||
out.write(kv.getTypeByte());
|
||||
out.write(cell.getTypeByte());
|
||||
}
|
||||
|
||||
// Write the value if it is not the same as before.
|
||||
if ((flag & FLAG_SAME_VALUE) == 0) {
|
||||
out.write(kv.getValueArray(), kv.getValueOffset(), vLength);
|
||||
out.write(cell.getValueArray(), cell.getValueOffset(), vLength);
|
||||
}
|
||||
}
|
||||
return kLength + vLength + KeyValue.KEYVALUE_INFRASTRUCTURE_SIZE;
|
||||
|
|
|
@ -22,8 +22,11 @@ import java.io.IOException;
|
|||
import java.nio.ByteBuffer;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.Cell;
|
||||
import org.apache.hadoop.hbase.CellUtil;
|
||||
import org.apache.hadoop.hbase.KeyValue;
|
||||
import org.apache.hadoop.hbase.KeyValue.KVComparator;
|
||||
import org.apache.hadoop.hbase.KeyValueUtil;
|
||||
import org.apache.hadoop.hbase.util.ByteBufferUtils;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
|
||||
|
@ -45,34 +48,78 @@ import org.apache.hadoop.hbase.util.Bytes;
|
|||
public class PrefixKeyDeltaEncoder extends BufferedDataBlockEncoder {
|
||||
|
||||
@Override
|
||||
public int internalEncode(KeyValue kv, HFileBlockDefaultEncodingContext encodingContext,
|
||||
public int internalEncode(Cell cell, HFileBlockDefaultEncodingContext encodingContext,
|
||||
DataOutputStream out) throws IOException {
|
||||
byte[] kvBuf = kv.getBuffer();
|
||||
int klength = kv.getKeyLength();
|
||||
int vlength = kv.getValueLength();
|
||||
int klength = KeyValueUtil.keyLength(cell);
|
||||
int vlength = cell.getValueLength();
|
||||
EncodingState state = encodingContext.getEncodingState();
|
||||
if (state.prevKv == null) {
|
||||
if (state.prevCell == null) {
|
||||
// copy the key, there is no common prefix with none
|
||||
ByteBufferUtils.putCompressedInt(out, klength);
|
||||
ByteBufferUtils.putCompressedInt(out, vlength);
|
||||
ByteBufferUtils.putCompressedInt(out, 0);
|
||||
out.write(kvBuf, kv.getKeyOffset(), klength + vlength);
|
||||
CellUtil.writeFlatKey(cell, out);
|
||||
} else {
|
||||
// find a common prefix and skip it
|
||||
int common = ByteBufferUtils.findCommonPrefix(state.prevKv.getBuffer(),
|
||||
state.prevKv.getKeyOffset(), state.prevKv.getKeyLength(), kvBuf, kv.getKeyOffset(),
|
||||
kv.getKeyLength());
|
||||
int common = CellUtil.findCommonPrefixInFlatKey(cell, state.prevCell, true, true);
|
||||
ByteBufferUtils.putCompressedInt(out, klength - common);
|
||||
ByteBufferUtils.putCompressedInt(out, vlength);
|
||||
ByteBufferUtils.putCompressedInt(out, common);
|
||||
out.write(kvBuf, kv.getKeyOffset() + common, klength - common + vlength);
|
||||
writeKeyExcludingCommon(cell, common, out);
|
||||
}
|
||||
// Write the value part
|
||||
out.write(cell.getValueArray(), cell.getValueOffset(), vlength);
|
||||
int size = klength + vlength + KeyValue.KEYVALUE_INFRASTRUCTURE_SIZE;
|
||||
size += afterEncodingKeyValue(kv, out, encodingContext);
|
||||
state.prevKv = kv;
|
||||
size += afterEncodingKeyValue(cell, out, encodingContext);
|
||||
state.prevCell = cell;
|
||||
return size;
|
||||
}
|
||||
|
||||
private void writeKeyExcludingCommon(Cell cell, int commonPrefix, DataOutputStream out)
|
||||
throws IOException {
|
||||
short rLen = cell.getRowLength();
|
||||
if (commonPrefix < rLen + KeyValue.ROW_LENGTH_SIZE) {
|
||||
// Previous and current rows are different. Need to write the differing part followed by
|
||||
// cf,q,ts and type
|
||||
CellUtil.writeRowKeyExcludingCommon(cell, rLen, commonPrefix, out);
|
||||
byte fLen = cell.getFamilyLength();
|
||||
out.writeByte(fLen);
|
||||
out.write(cell.getFamilyArray(), cell.getFamilyOffset(), fLen);
|
||||
out.write(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength());
|
||||
out.writeLong(cell.getTimestamp());
|
||||
out.writeByte(cell.getTypeByte());
|
||||
} else {
|
||||
// The full row key part is common. CF part will be common for sure as we deal with Cells in
|
||||
// same family. Just need write the differing part in q, ts and type
|
||||
commonPrefix = commonPrefix - (rLen + KeyValue.ROW_LENGTH_SIZE)
|
||||
- (cell.getFamilyLength() + KeyValue.FAMILY_LENGTH_SIZE);
|
||||
int qLen = cell.getQualifierLength();
|
||||
int commonQualPrefix = Math.min(commonPrefix, qLen);
|
||||
int qualPartLenToWrite = qLen - commonQualPrefix;
|
||||
if (qualPartLenToWrite > 0) {
|
||||
out.write(cell.getQualifierArray(), cell.getQualifierOffset() + commonQualPrefix,
|
||||
qualPartLenToWrite);
|
||||
}
|
||||
commonPrefix -= commonQualPrefix;
|
||||
// Common part in TS also?
|
||||
if (commonPrefix > 0) {
|
||||
int commonTimestampPrefix = Math.min(commonPrefix, KeyValue.TIMESTAMP_SIZE);
|
||||
if (commonTimestampPrefix < KeyValue.TIMESTAMP_SIZE) {
|
||||
byte[] curTsBuf = Bytes.toBytes(cell.getTimestamp());
|
||||
out.write(curTsBuf, commonTimestampPrefix, KeyValue.TIMESTAMP_SIZE
|
||||
- commonTimestampPrefix);
|
||||
}
|
||||
commonPrefix -= commonTimestampPrefix;
|
||||
if (commonPrefix == 0) {
|
||||
out.writeByte(cell.getTypeByte());
|
||||
}
|
||||
} else {
|
||||
out.writeLong(cell.getTimestamp());
|
||||
out.writeByte(cell.getTypeByte());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected ByteBuffer internalDecodeKeyValues(DataInputStream source, int allocateHeaderLength,
|
||||
int skipLastBytes, HFileBlockDefaultDecodingContext decodingCtx) throws IOException {
|
||||
|
|
|
@ -24,6 +24,7 @@ import java.util.List;
|
|||
import java.util.NavigableMap;
|
||||
import java.util.TreeMap;
|
||||
|
||||
import org.apache.hadoop.hbase.KeyValue.Type;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
@ -323,4 +324,48 @@ public class TestCellUtil {
|
|||
Assert.assertFalse(CellUtil.overlappingKeys(empty, b, b, c));
|
||||
Assert.assertFalse(CellUtil.overlappingKeys(empty, a, b, c));
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFindCommonPrefixInFlatKey() {
|
||||
// The whole key matching case
|
||||
KeyValue kv1 = new KeyValue("r1".getBytes(), "f1".getBytes(), "q1".getBytes(), null);
|
||||
Assert.assertEquals(kv1.getKeyLength(),
|
||||
CellUtil.findCommonPrefixInFlatKey(kv1, kv1, true, true));
|
||||
Assert.assertEquals(kv1.getKeyLength(),
|
||||
CellUtil.findCommonPrefixInFlatKey(kv1, kv1, false, true));
|
||||
Assert.assertEquals(kv1.getKeyLength() - KeyValue.TIMESTAMP_TYPE_SIZE,
|
||||
CellUtil.findCommonPrefixInFlatKey(kv1, kv1, true, false));
|
||||
// The rk length itself mismatch
|
||||
KeyValue kv2 = new KeyValue("r12".getBytes(), "f1".getBytes(), "q1".getBytes(), null);
|
||||
Assert.assertEquals(1, CellUtil.findCommonPrefixInFlatKey(kv1, kv2, true, true));
|
||||
// part of rk is same
|
||||
KeyValue kv3 = new KeyValue("r14".getBytes(), "f1".getBytes(), "q1".getBytes(), null);
|
||||
Assert.assertEquals(KeyValue.ROW_LENGTH_SIZE + "r1".getBytes().length,
|
||||
CellUtil.findCommonPrefixInFlatKey(kv2, kv3, true, true));
|
||||
// entire rk is same but different cf name
|
||||
KeyValue kv4 = new KeyValue("r14".getBytes(), "f2".getBytes(), "q1".getBytes(), null);
|
||||
Assert.assertEquals(KeyValue.ROW_LENGTH_SIZE + kv3.getRowLength() + KeyValue.FAMILY_LENGTH_SIZE
|
||||
+ "f".getBytes().length, CellUtil.findCommonPrefixInFlatKey(kv3, kv4, false, true));
|
||||
// rk and family are same and part of qualifier
|
||||
KeyValue kv5 = new KeyValue("r14".getBytes(), "f2".getBytes(), "q123".getBytes(), null);
|
||||
Assert.assertEquals(KeyValue.ROW_LENGTH_SIZE + kv3.getRowLength() + KeyValue.FAMILY_LENGTH_SIZE
|
||||
+ kv4.getFamilyLength() + kv4.getQualifierLength(),
|
||||
CellUtil.findCommonPrefixInFlatKey(kv4, kv5, true, true));
|
||||
// rk, cf and q are same. ts differs
|
||||
KeyValue kv6 = new KeyValue("rk".getBytes(), 1234L);
|
||||
KeyValue kv7 = new KeyValue("rk".getBytes(), 1235L);
|
||||
// only last byte out of 8 ts bytes in ts part differs
|
||||
Assert.assertEquals(KeyValue.ROW_LENGTH_SIZE + kv6.getRowLength() + KeyValue.FAMILY_LENGTH_SIZE
|
||||
+ kv6.getFamilyLength() + kv6.getQualifierLength() + 7,
|
||||
CellUtil.findCommonPrefixInFlatKey(kv6, kv7, true, true));
|
||||
// rk, cf, q and ts are same. Only type differs
|
||||
KeyValue kv8 = new KeyValue("rk".getBytes(), 1234L, Type.Delete);
|
||||
Assert.assertEquals(KeyValue.ROW_LENGTH_SIZE + kv6.getRowLength() + KeyValue.FAMILY_LENGTH_SIZE
|
||||
+ kv6.getFamilyLength() + kv6.getQualifierLength() + KeyValue.TIMESTAMP_SIZE,
|
||||
CellUtil.findCommonPrefixInFlatKey(kv6, kv8, true, true));
|
||||
// With out TS_TYPE check
|
||||
Assert.assertEquals(KeyValue.ROW_LENGTH_SIZE + kv6.getRowLength() + KeyValue.FAMILY_LENGTH_SIZE
|
||||
+ kv6.getFamilyLength() + kv6.getQualifierLength(),
|
||||
CellUtil.findCommonPrefixInFlatKey(kv6, kv8, true, false));
|
||||
}
|
||||
}
|
|
@ -24,6 +24,8 @@ import java.io.IOException;
|
|||
import java.nio.ByteBuffer;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.Cell;
|
||||
import org.apache.hadoop.hbase.CellUtil;
|
||||
import org.apache.hadoop.hbase.KeyValue;
|
||||
import org.apache.hadoop.hbase.KeyValue.KVComparator;
|
||||
import org.apache.hadoop.hbase.KeyValue.MetaComparator;
|
||||
|
@ -161,14 +163,14 @@ public class PrefixTreeCodec implements DataBlockEncoder{
|
|||
}
|
||||
|
||||
@Override
|
||||
public int encode(KeyValue kv, HFileBlockEncodingContext encodingCtx, DataOutputStream out)
|
||||
public int encode(Cell cell, HFileBlockEncodingContext encodingCtx, DataOutputStream out)
|
||||
throws IOException {
|
||||
PrefixTreeEncodingState state = (PrefixTreeEncodingState) encodingCtx.getEncodingState();
|
||||
PrefixTreeEncoder builder = state.builder;
|
||||
builder.write(kv);
|
||||
int size = kv.getLength();
|
||||
builder.write(cell);
|
||||
int size = KeyValueUtil.length(cell);
|
||||
if (encodingCtx.getHFileContext().isIncludesMvcc()) {
|
||||
size += WritableUtils.getVIntSize(kv.getMvccVersion());
|
||||
size += WritableUtils.getVIntSize(cell.getSequenceId());
|
||||
}
|
||||
return size;
|
||||
}
|
||||
|
|
|
@ -22,7 +22,6 @@ import java.io.DataOutputStream;
|
|||
import java.io.IOException;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
|
@ -31,9 +30,11 @@ import org.apache.hadoop.fs.FSDataOutputStream;
|
|||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.permission.FsPermission;
|
||||
import org.apache.hadoop.hbase.Cell;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.KeyValue;
|
||||
import org.apache.hadoop.hbase.KeyValue.KVComparator;
|
||||
import org.apache.hadoop.hbase.KeyValueUtil;
|
||||
import org.apache.hadoop.hbase.io.compress.Compression;
|
||||
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
|
||||
import org.apache.hadoop.hbase.io.hfile.HFile.FileInfo;
|
||||
|
@ -47,10 +48,9 @@ import org.apache.hadoop.io.Writable;
|
|||
@InterfaceAudience.Private
|
||||
public abstract class AbstractHFileWriter implements HFile.Writer {
|
||||
|
||||
/** Key previously appended. Becomes the last key in the file. */
|
||||
protected byte[] lastKeyBuffer = null;
|
||||
/** The Cell previously appended. Becomes the last cell in the file.*/
|
||||
protected Cell lastCell = null;
|
||||
|
||||
protected int lastKeyOffset = -1;
|
||||
protected int lastKeyLength = -1;
|
||||
|
||||
/** FileSystem stream to write into. */
|
||||
|
@ -131,11 +131,12 @@ public abstract class AbstractHFileWriter implements HFile.Writer {
|
|||
* Add last bits of metadata to file info before it is written out.
|
||||
*/
|
||||
protected void finishFileInfo() throws IOException {
|
||||
if (lastKeyBuffer != null) {
|
||||
if (lastCell != null) {
|
||||
// Make a copy. The copy is stuffed into our fileinfo map. Needs a clean
|
||||
// byte buffer. Won't take a tuple.
|
||||
fileInfo.append(FileInfo.LASTKEY, Arrays.copyOfRange(lastKeyBuffer,
|
||||
lastKeyOffset, lastKeyOffset + lastKeyLength), false);
|
||||
byte[] lastKey = new byte[lastKeyLength];
|
||||
KeyValueUtil.appendKeyTo(lastCell, lastKey, 0);
|
||||
fileInfo.append(FileInfo.LASTKEY, lastKey, false);
|
||||
}
|
||||
|
||||
// Average key length.
|
||||
|
@ -181,30 +182,24 @@ public abstract class AbstractHFileWriter implements HFile.Writer {
|
|||
}
|
||||
|
||||
/**
|
||||
* Checks that the given key does not violate the key order.
|
||||
* Checks that the given Cell's key does not violate the key order.
|
||||
*
|
||||
* @param key Key to check.
|
||||
* @param cell Cell whose key to check.
|
||||
* @return true if the key is duplicate
|
||||
* @throws IOException if the key or the key order is wrong
|
||||
*/
|
||||
protected boolean checkKey(final byte[] key, final int offset,
|
||||
final int length) throws IOException {
|
||||
protected boolean checkKey(final Cell cell) throws IOException {
|
||||
boolean isDuplicateKey = false;
|
||||
|
||||
if (key == null || length <= 0) {
|
||||
if (cell == null) {
|
||||
throw new IOException("Key cannot be null or empty");
|
||||
}
|
||||
if (lastKeyBuffer != null) {
|
||||
int keyComp = comparator.compareFlatKey(lastKeyBuffer, lastKeyOffset,
|
||||
lastKeyLength, key, offset, length);
|
||||
if (lastCell != null) {
|
||||
int keyComp = comparator.compareOnlyKeyPortion(lastCell, cell);
|
||||
|
||||
if (keyComp > 0) {
|
||||
throw new IOException("Added a key not lexically larger than"
|
||||
+ " previous key="
|
||||
+ Bytes.toStringBinary(key, offset, length)
|
||||
+ ", lastkey="
|
||||
+ Bytes.toStringBinary(lastKeyBuffer, lastKeyOffset,
|
||||
lastKeyLength));
|
||||
+ " previous. Current cell = " + cell + ", lastCell = " + lastCell);
|
||||
} else if (keyComp == 0) {
|
||||
isDuplicateKey = true;
|
||||
}
|
||||
|
|
|
@ -35,12 +35,8 @@ import java.util.Map;
|
|||
import java.util.Set;
|
||||
import java.util.SortedMap;
|
||||
import java.util.TreeMap;
|
||||
import java.util.concurrent.ArrayBlockingQueue;
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
import org.apache.hadoop.hbase.util.ByteStringer;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
|
@ -51,6 +47,7 @@ import org.apache.hadoop.fs.FileStatus;
|
|||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.PathFilter;
|
||||
import org.apache.hadoop.hbase.Cell;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.KeyValue;
|
||||
import org.apache.hadoop.hbase.KeyValue.KVComparator;
|
||||
|
@ -63,13 +60,13 @@ import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
|
|||
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.BytesBytesPair;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.HFileProtos;
|
||||
import org.apache.hadoop.hbase.util.BloomFilterWriter;
|
||||
import org.apache.hadoop.hbase.util.ByteStringer;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.ChecksumType;
|
||||
import org.apache.hadoop.hbase.util.FSUtils;
|
||||
import org.apache.hadoop.io.Writable;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.collect.Lists;
|
||||
|
||||
/**
|
||||
* File format for hbase.
|
||||
|
@ -203,7 +200,7 @@ public class HFile {
|
|||
/** Add an element to the file info map. */
|
||||
void appendFileInfo(byte[] key, byte[] value) throws IOException;
|
||||
|
||||
void append(KeyValue kv) throws IOException;
|
||||
void append(Cell cell) throws IOException;
|
||||
|
||||
void append(byte[] key, byte[] value) throws IOException;
|
||||
|
||||
|
|
|
@ -32,6 +32,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
|
|||
import org.apache.hadoop.fs.FSDataInputStream;
|
||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.Cell;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.KeyValue;
|
||||
import org.apache.hadoop.hbase.fs.HFileSystem;
|
||||
|
@ -818,13 +819,13 @@ public class HFileBlock implements Cacheable {
|
|||
}
|
||||
|
||||
/**
|
||||
* Writes the kv to this block
|
||||
* @param kv
|
||||
* Writes the Cell to this block
|
||||
* @param cell
|
||||
* @throws IOException
|
||||
*/
|
||||
public void write(KeyValue kv) throws IOException{
|
||||
public void write(Cell cell) throws IOException{
|
||||
expectState(State.WRITING);
|
||||
this.unencodedDataSizeWritten += this.dataBlockEncoder.encode(kv, dataBlockEncodingCtx,
|
||||
this.unencodedDataSizeWritten += this.dataBlockEncoder.encode(cell, dataBlockEncodingCtx,
|
||||
this.userDataStream);
|
||||
}
|
||||
|
||||
|
|
|
@ -20,7 +20,7 @@ import java.io.DataOutputStream;
|
|||
import java.io.IOException;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.KeyValue;
|
||||
import org.apache.hadoop.hbase.Cell;
|
||||
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
|
||||
import org.apache.hadoop.hbase.io.encoding.HFileBlockDecodingContext;
|
||||
import org.apache.hadoop.hbase.io.encoding.HFileBlockEncodingContext;
|
||||
|
@ -49,13 +49,13 @@ public interface HFileDataBlockEncoder {
|
|||
|
||||
/**
|
||||
* Encodes a KeyValue.
|
||||
* @param kv
|
||||
* @param cell
|
||||
* @param encodingCtx
|
||||
* @param out
|
||||
* @return unencoded kv size
|
||||
* @throws IOException
|
||||
*/
|
||||
int encode(KeyValue kv, HFileBlockEncodingContext encodingCtx, DataOutputStream out)
|
||||
int encode(Cell cell, HFileBlockEncodingContext encodingCtx, DataOutputStream out)
|
||||
throws IOException;
|
||||
|
||||
/**
|
||||
|
|
|
@ -20,6 +20,7 @@ import java.io.DataOutputStream;
|
|||
import java.io.IOException;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.Cell;
|
||||
import org.apache.hadoop.hbase.KeyValue;
|
||||
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoder;
|
||||
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
|
||||
|
@ -92,9 +93,9 @@ public class HFileDataBlockEncoderImpl implements HFileDataBlockEncoder {
|
|||
}
|
||||
|
||||
@Override
|
||||
public int encode(KeyValue kv, HFileBlockEncodingContext encodingCtx, DataOutputStream out)
|
||||
public int encode(Cell cell, HFileBlockEncodingContext encodingCtx, DataOutputStream out)
|
||||
throws IOException {
|
||||
return this.encoding.getEncoder().encode(kv, encodingCtx, out);
|
||||
return this.encoding.getEncoder().encode(cell, encodingCtx, out);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -32,8 +32,10 @@ import org.apache.hadoop.conf.Configuration;
|
|||
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.Cell;
|
||||
import org.apache.hadoop.hbase.KeyValue;
|
||||
import org.apache.hadoop.hbase.KeyValue.KVComparator;
|
||||
import org.apache.hadoop.hbase.KeyValueUtil;
|
||||
import org.apache.hadoop.hbase.io.hfile.HFile.Writer;
|
||||
import org.apache.hadoop.hbase.io.hfile.HFileBlock.BlockWritable;
|
||||
import org.apache.hadoop.hbase.util.BloomFilterWriter;
|
||||
|
@ -207,7 +209,7 @@ public class HFileWriterV2 extends AbstractHFileWriter {
|
|||
firstKeyInBlock = null;
|
||||
if (lastKeyLength > 0) {
|
||||
lastKeyOfPreviousBlock = new byte[lastKeyLength];
|
||||
System.arraycopy(lastKeyBuffer, lastKeyOffset, lastKeyOfPreviousBlock, 0, lastKeyLength);
|
||||
KeyValueUtil.appendKeyTo(lastCell, lastKeyOfPreviousBlock, 0);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -242,19 +244,17 @@ public class HFileWriterV2 extends AbstractHFileWriter {
|
|||
* Add key/value to file. Keys must be added in an order that agrees with the
|
||||
* Comparator passed on construction.
|
||||
*
|
||||
* @param kv
|
||||
* KeyValue to add. Cannot be empty nor null.
|
||||
* @param cell
|
||||
* Cell to add. Cannot be empty nor null.
|
||||
* @throws IOException
|
||||
*/
|
||||
@Override
|
||||
public void append(final KeyValue kv) throws IOException {
|
||||
byte[] key = kv.getBuffer();
|
||||
int koffset = kv.getKeyOffset();
|
||||
int klength = kv.getKeyLength();
|
||||
byte[] value = kv.getValueArray();
|
||||
int voffset = kv.getValueOffset();
|
||||
int vlength = kv.getValueLength();
|
||||
boolean dupKey = checkKey(key, koffset, klength);
|
||||
public void append(final Cell cell) throws IOException {
|
||||
int klength = KeyValueUtil.keyLength(cell);
|
||||
byte[] value = cell.getValueArray();
|
||||
int voffset = cell.getValueOffset();
|
||||
int vlength = cell.getValueLength();
|
||||
boolean dupKey = checkKey(cell);
|
||||
checkValue(value, voffset, vlength);
|
||||
if (!dupKey) {
|
||||
checkBlockBoundary();
|
||||
|
@ -263,7 +263,7 @@ public class HFileWriterV2 extends AbstractHFileWriter {
|
|||
if (!fsBlockWriter.isWriting())
|
||||
newBlock();
|
||||
|
||||
fsBlockWriter.write(kv);
|
||||
fsBlockWriter.write(cell);
|
||||
|
||||
totalKeyLength += klength;
|
||||
totalValueLength += vlength;
|
||||
|
@ -272,14 +272,13 @@ public class HFileWriterV2 extends AbstractHFileWriter {
|
|||
if (firstKeyInBlock == null) {
|
||||
// Copy the key.
|
||||
firstKeyInBlock = new byte[klength];
|
||||
System.arraycopy(key, koffset, firstKeyInBlock, 0, klength);
|
||||
KeyValueUtil.appendKeyTo(cell, firstKeyInBlock, 0);
|
||||
}
|
||||
|
||||
lastKeyBuffer = key;
|
||||
lastKeyOffset = koffset;
|
||||
lastCell = cell;
|
||||
lastKeyLength = klength;
|
||||
entryCount++;
|
||||
this.maxMemstoreTS = Math.max(this.maxMemstoreTS, kv.getMvccVersion());
|
||||
this.maxMemstoreTS = Math.max(this.maxMemstoreTS, cell.getSequenceId());
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -26,6 +26,7 @@ import org.apache.hadoop.conf.Configuration;
|
|||
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.Cell;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.KeyValue;
|
||||
import org.apache.hadoop.hbase.KeyValue.KVComparator;
|
||||
|
@ -77,15 +78,15 @@ public class HFileWriterV3 extends HFileWriterV2 {
|
|||
* Add key/value to file. Keys must be added in an order that agrees with the
|
||||
* Comparator passed on construction.
|
||||
*
|
||||
* @param kv
|
||||
* KeyValue to add. Cannot be empty nor null.
|
||||
* @param cell
|
||||
* Cell to add. Cannot be empty nor null.
|
||||
* @throws IOException
|
||||
*/
|
||||
@Override
|
||||
public void append(final KeyValue kv) throws IOException {
|
||||
public void append(final Cell cell) throws IOException {
|
||||
// Currently get the complete arrays
|
||||
super.append(kv);
|
||||
int tagsLength = kv.getTagsLength();
|
||||
super.append(cell);
|
||||
int tagsLength = cell.getTagsLength();
|
||||
if (tagsLength > this.maxTagsLength) {
|
||||
this.maxTagsLength = tagsLength;
|
||||
}
|
||||
|
|
|
@ -20,7 +20,10 @@ import java.io.DataOutputStream;
|
|||
import java.io.IOException;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.Cell;
|
||||
import org.apache.hadoop.hbase.CellUtil;
|
||||
import org.apache.hadoop.hbase.KeyValue;
|
||||
import org.apache.hadoop.hbase.KeyValueUtil;
|
||||
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
|
||||
import org.apache.hadoop.hbase.io.encoding.HFileBlockDecodingContext;
|
||||
import org.apache.hadoop.hbase.io.encoding.HFileBlockDefaultDecodingContext;
|
||||
|
@ -42,28 +45,28 @@ public class NoOpDataBlockEncoder implements HFileDataBlockEncoder {
|
|||
}
|
||||
|
||||
@Override
|
||||
public int encode(KeyValue kv, HFileBlockEncodingContext encodingCtx, DataOutputStream out)
|
||||
public int encode(Cell cell, HFileBlockEncodingContext encodingCtx, DataOutputStream out)
|
||||
throws IOException {
|
||||
int klength = kv.getKeyLength();
|
||||
int vlength = kv.getValueLength();
|
||||
int klength = KeyValueUtil.keyLength(cell);
|
||||
int vlength = cell.getValueLength();
|
||||
|
||||
out.writeInt(klength);
|
||||
out.writeInt(vlength);
|
||||
out.write(kv.getBuffer(), kv.getKeyOffset(), klength);
|
||||
out.write(kv.getValueArray(), kv.getValueOffset(), vlength);
|
||||
CellUtil.writeFlatKey(cell, out);
|
||||
out.write(cell.getValueArray(), cell.getValueOffset(), vlength);
|
||||
int encodedKvSize = klength + vlength + KeyValue.KEYVALUE_INFRASTRUCTURE_SIZE;
|
||||
// Write the additional tag into the stream
|
||||
if (encodingCtx.getHFileContext().isIncludesTags()) {
|
||||
int tagsLength = kv.getTagsLength();
|
||||
int tagsLength = cell.getTagsLength();
|
||||
out.writeShort(tagsLength);
|
||||
if (tagsLength > 0) {
|
||||
out.write(kv.getTagsArray(), kv.getTagsOffset(), tagsLength);
|
||||
out.write(cell.getTagsArray(), cell.getTagsOffset(), tagsLength);
|
||||
}
|
||||
encodedKvSize += tagsLength + KeyValue.TAGS_LENGTH_SIZE;
|
||||
}
|
||||
if (encodingCtx.getHFileContext().isIncludesMvcc()) {
|
||||
WritableUtils.writeVLong(out, kv.getMvccVersion());
|
||||
encodedKvSize += WritableUtils.getVIntSize(kv.getMvccVersion());
|
||||
WritableUtils.writeVLong(out, cell.getSequenceId());
|
||||
encodedKvSize += WritableUtils.getVIntSize(cell.getSequenceId());
|
||||
}
|
||||
return encodedKvSize;
|
||||
}
|
||||
|
|
|
@ -37,6 +37,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
|
|||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.Cell;
|
||||
import org.apache.hadoop.hbase.CellUtil;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.HDFSBlocksDistribution;
|
||||
|
@ -698,9 +699,9 @@ public class StoreFile {
|
|||
private byte[] lastBloomKey;
|
||||
private int lastBloomKeyOffset, lastBloomKeyLen;
|
||||
private KVComparator kvComparator;
|
||||
private KeyValue lastKv = null;
|
||||
private Cell lastCell = null;
|
||||
private long earliestPutTs = HConstants.LATEST_TIMESTAMP;
|
||||
private KeyValue lastDeleteFamilyKV = null;
|
||||
private Cell lastDeleteFamilyCell = null;
|
||||
private long deleteFamilyCnt = 0;
|
||||
|
||||
/** Bytes per Checksum */
|
||||
|
@ -810,28 +811,28 @@ public class StoreFile {
|
|||
*
|
||||
* If the timeRangeTracker is not set,
|
||||
* update TimeRangeTracker to include the timestamp of this key
|
||||
* @param kv
|
||||
* @param cell
|
||||
*/
|
||||
public void trackTimestamps(final KeyValue kv) {
|
||||
if (KeyValue.Type.Put.getCode() == kv.getTypeByte()) {
|
||||
earliestPutTs = Math.min(earliestPutTs, kv.getTimestamp());
|
||||
public void trackTimestamps(final Cell cell) {
|
||||
if (KeyValue.Type.Put.getCode() == cell.getTypeByte()) {
|
||||
earliestPutTs = Math.min(earliestPutTs, cell.getTimestamp());
|
||||
}
|
||||
if (!isTimeRangeTrackerSet) {
|
||||
timeRangeTracker.includeTimestamp(kv);
|
||||
timeRangeTracker.includeTimestamp(cell);
|
||||
}
|
||||
}
|
||||
|
||||
private void appendGeneralBloomfilter(final KeyValue kv) throws IOException {
|
||||
private void appendGeneralBloomfilter(final Cell cell) throws IOException {
|
||||
if (this.generalBloomFilterWriter != null) {
|
||||
// only add to the bloom filter on a new, unique key
|
||||
boolean newKey = true;
|
||||
if (this.lastKv != null) {
|
||||
if (this.lastCell != null) {
|
||||
switch(bloomType) {
|
||||
case ROW:
|
||||
newKey = ! kvComparator.matchingRows(kv, lastKv);
|
||||
newKey = ! kvComparator.matchingRows(cell, lastCell);
|
||||
break;
|
||||
case ROWCOL:
|
||||
newKey = ! kvComparator.matchingRowColumn(kv, lastKv);
|
||||
newKey = ! kvComparator.matchingRowColumn(cell, lastCell);
|
||||
break;
|
||||
case NONE:
|
||||
newKey = false;
|
||||
|
@ -855,17 +856,17 @@ public class StoreFile {
|
|||
|
||||
switch (bloomType) {
|
||||
case ROW:
|
||||
bloomKey = kv.getRowArray();
|
||||
bloomKeyOffset = kv.getRowOffset();
|
||||
bloomKeyLen = kv.getRowLength();
|
||||
bloomKey = cell.getRowArray();
|
||||
bloomKeyOffset = cell.getRowOffset();
|
||||
bloomKeyLen = cell.getRowLength();
|
||||
break;
|
||||
case ROWCOL:
|
||||
// merge(row, qualifier)
|
||||
// TODO: could save one buffer copy in case of compound Bloom
|
||||
// filters when this involves creating a KeyValue
|
||||
bloomKey = generalBloomFilterWriter.createBloomKey(kv.getRowArray(),
|
||||
kv.getRowOffset(), kv.getRowLength(), kv.getQualifierArray(),
|
||||
kv.getQualifierOffset(), kv.getQualifierLength());
|
||||
bloomKey = generalBloomFilterWriter.createBloomKey(cell.getRowArray(),
|
||||
cell.getRowOffset(), cell.getRowLength(), cell.getQualifierArray(),
|
||||
cell.getQualifierOffset(), cell.getQualifierLength());
|
||||
bloomKeyOffset = 0;
|
||||
bloomKeyLen = bloomKey.length;
|
||||
break;
|
||||
|
@ -887,14 +888,14 @@ public class StoreFile {
|
|||
lastBloomKey = bloomKey;
|
||||
lastBloomKeyOffset = bloomKeyOffset;
|
||||
lastBloomKeyLen = bloomKeyLen;
|
||||
this.lastKv = kv;
|
||||
this.lastCell = cell;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void appendDeleteFamilyBloomFilter(final KeyValue kv)
|
||||
private void appendDeleteFamilyBloomFilter(final Cell cell)
|
||||
throws IOException {
|
||||
if (!CellUtil.isDeleteFamily(kv) && !CellUtil.isDeleteFamilyVersion(kv)) {
|
||||
if (!CellUtil.isDeleteFamily(cell) && !CellUtil.isDeleteFamilyVersion(cell)) {
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -902,22 +903,22 @@ public class StoreFile {
|
|||
deleteFamilyCnt++;
|
||||
if (null != this.deleteFamilyBloomFilterWriter) {
|
||||
boolean newKey = true;
|
||||
if (lastDeleteFamilyKV != null) {
|
||||
newKey = !kvComparator.matchingRows(kv, lastDeleteFamilyKV);
|
||||
if (lastDeleteFamilyCell != null) {
|
||||
newKey = !kvComparator.matchingRows(cell, lastDeleteFamilyCell);
|
||||
}
|
||||
if (newKey) {
|
||||
this.deleteFamilyBloomFilterWriter.add(kv.getRowArray(),
|
||||
kv.getRowOffset(), kv.getRowLength());
|
||||
this.lastDeleteFamilyKV = kv;
|
||||
this.deleteFamilyBloomFilterWriter.add(cell.getRowArray(),
|
||||
cell.getRowOffset(), cell.getRowLength());
|
||||
this.lastDeleteFamilyCell = cell;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public void append(final KeyValue kv) throws IOException {
|
||||
appendGeneralBloomfilter(kv);
|
||||
appendDeleteFamilyBloomFilter(kv);
|
||||
writer.append(kv);
|
||||
trackTimestamps(kv);
|
||||
public void append(final Cell cell) throws IOException {
|
||||
appendGeneralBloomfilter(cell);
|
||||
appendDeleteFamilyBloomFilter(cell);
|
||||
writer.append(cell);
|
||||
trackTimestamps(cell);
|
||||
}
|
||||
|
||||
public Path getPath() {
|
||||
|
|
|
@ -26,7 +26,7 @@ import org.apache.commons.logging.Log;
|
|||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.KeyValue;
|
||||
import org.apache.hadoop.hbase.Cell;
|
||||
import org.apache.hadoop.hbase.KeyValue.KVComparator;
|
||||
import org.apache.hadoop.hbase.regionserver.compactions.Compactor;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
|
@ -159,8 +159,8 @@ public abstract class StripeMultiFileWriter implements Compactor.CellSink {
|
|||
private StoreFile.Writer currentWriter;
|
||||
private byte[] currentWriterEndKey;
|
||||
|
||||
private KeyValue lastKv;
|
||||
private long kvsInCurrentWriter = 0;
|
||||
private Cell lastCell;
|
||||
private long cellsInCurrentWriter = 0;
|
||||
private int majorRangeFromIndex = -1, majorRangeToIndex = -1;
|
||||
private boolean hasAnyWriter = false;
|
||||
|
||||
|
@ -193,21 +193,21 @@ public abstract class StripeMultiFileWriter implements Compactor.CellSink {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void append(KeyValue kv) throws IOException {
|
||||
public void append(Cell cell) throws IOException {
|
||||
if (currentWriter == null && existingWriters.isEmpty()) {
|
||||
// First append ever, do a sanity check.
|
||||
sanityCheckLeft(this.boundaries.get(0),
|
||||
kv.getRowArray(), kv.getRowOffset(), kv.getRowLength());
|
||||
cell.getRowArray(), cell.getRowOffset(), cell.getRowLength());
|
||||
}
|
||||
prepareWriterFor(kv);
|
||||
currentWriter.append(kv);
|
||||
lastKv = kv; // for the sanity check
|
||||
++kvsInCurrentWriter;
|
||||
prepareWriterFor(cell);
|
||||
currentWriter.append(cell);
|
||||
lastCell = cell; // for the sanity check
|
||||
++cellsInCurrentWriter;
|
||||
}
|
||||
|
||||
private boolean isKvAfterCurrentWriter(KeyValue kv) {
|
||||
private boolean isCellAfterCurrentWriter(Cell cell) {
|
||||
return ((currentWriterEndKey != StripeStoreFileManager.OPEN_KEY) &&
|
||||
(comparator.compareRows(kv.getRowArray(), kv.getRowOffset(), kv.getRowLength(),
|
||||
(comparator.compareRows(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength(),
|
||||
currentWriterEndKey, 0, currentWriterEndKey.length) >= 0));
|
||||
}
|
||||
|
||||
|
@ -217,18 +217,18 @@ public abstract class StripeMultiFileWriter implements Compactor.CellSink {
|
|||
while (existingWriters.size() < boundaries.size() - 1) {
|
||||
createEmptyWriter();
|
||||
}
|
||||
if (lastKv != null) {
|
||||
if (lastCell != null) {
|
||||
sanityCheckRight(boundaries.get(boundaries.size() - 1),
|
||||
lastKv.getRowArray(), lastKv.getRowOffset(), lastKv.getRowLength());
|
||||
lastCell.getRowArray(), lastCell.getRowOffset(), lastCell.getRowLength());
|
||||
}
|
||||
}
|
||||
|
||||
private void prepareWriterFor(KeyValue kv) throws IOException {
|
||||
if (currentWriter != null && !isKvAfterCurrentWriter(kv)) return; // Use same writer.
|
||||
private void prepareWriterFor(Cell cell) throws IOException {
|
||||
if (currentWriter != null && !isCellAfterCurrentWriter(cell)) return; // Use same writer.
|
||||
|
||||
stopUsingCurrentWriter();
|
||||
// See if KV will be past the writer we are about to create; need to add another one.
|
||||
while (isKvAfterCurrentWriter(kv)) {
|
||||
while (isCellAfterCurrentWriter(cell)) {
|
||||
checkCanCreateWriter();
|
||||
createEmptyWriter();
|
||||
}
|
||||
|
@ -273,9 +273,9 @@ public abstract class StripeMultiFileWriter implements Compactor.CellSink {
|
|||
if (currentWriter != null) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Stopping to use a writer after [" + Bytes.toString(currentWriterEndKey)
|
||||
+ "] row; wrote out " + kvsInCurrentWriter + " kvs");
|
||||
+ "] row; wrote out " + cellsInCurrentWriter + " kvs");
|
||||
}
|
||||
kvsInCurrentWriter = 0;
|
||||
cellsInCurrentWriter = 0;
|
||||
}
|
||||
currentWriter = null;
|
||||
currentWriterEndKey = (existingWriters.size() + 1 == boundaries.size())
|
||||
|
@ -291,16 +291,16 @@ public abstract class StripeMultiFileWriter implements Compactor.CellSink {
|
|||
*/
|
||||
public static class SizeMultiWriter extends StripeMultiFileWriter {
|
||||
private int targetCount;
|
||||
private long targetKvs;
|
||||
private long targetCells;
|
||||
private byte[] left;
|
||||
private byte[] right;
|
||||
|
||||
private KeyValue lastKv;
|
||||
private Cell lastCell;
|
||||
private StoreFile.Writer currentWriter;
|
||||
protected byte[] lastRowInCurrentWriter = null;
|
||||
private long kvsInCurrentWriter = 0;
|
||||
private long kvsSeen = 0;
|
||||
private long kvsSeenInPrevious = 0;
|
||||
private long cellsInCurrentWriter = 0;
|
||||
private long cellsSeen = 0;
|
||||
private long cellsSeenInPrevious = 0;
|
||||
|
||||
/**
|
||||
* @param targetCount The maximum count of writers that can be created.
|
||||
|
@ -311,7 +311,7 @@ public abstract class StripeMultiFileWriter implements Compactor.CellSink {
|
|||
public SizeMultiWriter(int targetCount, long targetKvs, byte[] left, byte[] right) {
|
||||
super();
|
||||
this.targetCount = targetCount;
|
||||
this.targetKvs = targetKvs;
|
||||
this.targetCells = targetKvs;
|
||||
this.left = left;
|
||||
this.right = right;
|
||||
int preallocate = Math.min(this.targetCount, 64);
|
||||
|
@ -320,28 +320,28 @@ public abstract class StripeMultiFileWriter implements Compactor.CellSink {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void append(KeyValue kv) throws IOException {
|
||||
public void append(Cell cell) throws IOException {
|
||||
// If we are waiting for opportunity to close and we started writing different row,
|
||||
// discard the writer and stop waiting.
|
||||
boolean doCreateWriter = false;
|
||||
if (currentWriter == null) {
|
||||
// First append ever, do a sanity check.
|
||||
sanityCheckLeft(left, kv.getRowArray(), kv.getRowOffset(), kv.getRowLength());
|
||||
sanityCheckLeft(left, cell.getRowArray(), cell.getRowOffset(), cell.getRowLength());
|
||||
doCreateWriter = true;
|
||||
} else if (lastRowInCurrentWriter != null
|
||||
&& !comparator.matchingRows(kv.getRowArray(), kv.getRowOffset(), kv.getRowLength(),
|
||||
&& !comparator.matchingRows(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength(),
|
||||
lastRowInCurrentWriter, 0, lastRowInCurrentWriter.length)) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Stopping to use a writer after [" + Bytes.toString(lastRowInCurrentWriter)
|
||||
+ "] row; wrote out " + kvsInCurrentWriter + " kvs");
|
||||
+ "] row; wrote out " + cellsInCurrentWriter + " kvs");
|
||||
}
|
||||
lastRowInCurrentWriter = null;
|
||||
kvsInCurrentWriter = 0;
|
||||
kvsSeenInPrevious += kvsSeen;
|
||||
cellsInCurrentWriter = 0;
|
||||
cellsSeenInPrevious += cellsSeen;
|
||||
doCreateWriter = true;
|
||||
}
|
||||
if (doCreateWriter) {
|
||||
byte[] boundary = existingWriters.isEmpty() ? left : kv.getRow(); // make a copy
|
||||
byte[] boundary = existingWriters.isEmpty() ? left : cell.getRow(); // make a copy
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Creating new writer starting at [" + Bytes.toString(boundary) + "]");
|
||||
}
|
||||
|
@ -350,25 +350,25 @@ public abstract class StripeMultiFileWriter implements Compactor.CellSink {
|
|||
existingWriters.add(currentWriter);
|
||||
}
|
||||
|
||||
currentWriter.append(kv);
|
||||
lastKv = kv; // for the sanity check
|
||||
++kvsInCurrentWriter;
|
||||
kvsSeen = kvsInCurrentWriter;
|
||||
currentWriter.append(cell);
|
||||
lastCell = cell; // for the sanity check
|
||||
++cellsInCurrentWriter;
|
||||
cellsSeen = cellsInCurrentWriter;
|
||||
if (this.sourceScanner != null) {
|
||||
kvsSeen = Math.max(kvsSeen,
|
||||
this.sourceScanner.getEstimatedNumberOfKvsScanned() - kvsSeenInPrevious);
|
||||
cellsSeen = Math.max(cellsSeen,
|
||||
this.sourceScanner.getEstimatedNumberOfKvsScanned() - cellsSeenInPrevious);
|
||||
}
|
||||
|
||||
// If we are not already waiting for opportunity to close, start waiting if we can
|
||||
// create any more writers and if the current one is too big.
|
||||
if (lastRowInCurrentWriter == null
|
||||
&& existingWriters.size() < targetCount
|
||||
&& kvsSeen >= targetKvs) {
|
||||
lastRowInCurrentWriter = kv.getRow(); // make a copy
|
||||
&& cellsSeen >= targetCells) {
|
||||
lastRowInCurrentWriter = cell.getRow(); // make a copy
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Preparing to start a new writer after [" + Bytes.toString(
|
||||
lastRowInCurrentWriter) + "] row; observed " + kvsSeen + " kvs and wrote out "
|
||||
+ kvsInCurrentWriter + " kvs");
|
||||
lastRowInCurrentWriter) + "] row; observed " + cellsSeen + " kvs and wrote out "
|
||||
+ cellsInCurrentWriter + " kvs");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -376,13 +376,13 @@ public abstract class StripeMultiFileWriter implements Compactor.CellSink {
|
|||
@Override
|
||||
protected void commitWritersInternal() throws IOException {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Stopping with " + kvsInCurrentWriter + " kvs in last writer" +
|
||||
LOG.debug("Stopping with " + cellsInCurrentWriter + " kvs in last writer" +
|
||||
((this.sourceScanner == null) ? "" : ("; observed estimated "
|
||||
+ this.sourceScanner.getEstimatedNumberOfKvsScanned() + " KVs total")));
|
||||
}
|
||||
if (lastKv != null) {
|
||||
if (lastCell != null) {
|
||||
sanityCheckRight(
|
||||
right, lastKv.getRowArray(), lastKv.getRowOffset(), lastKv.getRowLength());
|
||||
right, lastCell.getRowArray(), lastCell.getRowOffset(), lastCell.getRowLength());
|
||||
}
|
||||
|
||||
// When expired stripes were going to be merged into one, and if no writer was created during
|
||||
|
|
|
@ -23,6 +23,7 @@ import java.io.DataOutput;
|
|||
import java.io.IOException;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.Cell;
|
||||
import org.apache.hadoop.hbase.CellUtil;
|
||||
import org.apache.hadoop.hbase.KeyValue;
|
||||
import org.apache.hadoop.hbase.KeyValue.Type;
|
||||
|
@ -66,14 +67,14 @@ public class TimeRangeTracker implements Writable {
|
|||
}
|
||||
|
||||
/**
|
||||
* Update the current TimestampRange to include the timestamp from KeyValue
|
||||
* Update the current TimestampRange to include the timestamp from Cell
|
||||
* If the Key is of type DeleteColumn or DeleteFamily, it includes the
|
||||
* entire time range from 0 to timestamp of the key.
|
||||
* @param kv the KeyValue to include
|
||||
* @param cell the Cell to include
|
||||
*/
|
||||
public void includeTimestamp(final KeyValue kv) {
|
||||
includeTimestamp(kv.getTimestamp());
|
||||
if (CellUtil.isDeleteColumnOrFamily(kv)) {
|
||||
public void includeTimestamp(final Cell cell) {
|
||||
includeTimestamp(cell.getTimestamp());
|
||||
if (CellUtil.isDeleteColumnOrFamily(cell)) {
|
||||
includeTimestamp(0);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -75,11 +75,8 @@ public abstract class Compactor {
|
|||
HConstants.MIN_KEEP_SEQID_PERIOD), HConstants.MIN_KEEP_SEQID_PERIOD);
|
||||
}
|
||||
|
||||
/**
|
||||
* TODO: Replace this with CellOutputStream when StoreFile.Writer uses cells.
|
||||
*/
|
||||
public interface CellSink {
|
||||
void append(KeyValue kv) throws IOException;
|
||||
void append(Cell cell) throws IOException;
|
||||
}
|
||||
|
||||
public CompactionProgress getProgress() {
|
||||
|
|
|
@ -155,7 +155,7 @@ public class CompoundBloomFilterWriter extends CompoundBloomFilterBase
|
|||
* as defined by the comparator this compound Bloom filter is configured
|
||||
* with. For efficiency, key monotonicity is not checked here. See
|
||||
* {@link org.apache.hadoop.hbase.regionserver.StoreFile.Writer#append(
|
||||
* org.apache.hadoop.hbase.KeyValue)} for the details of deduplication.
|
||||
* org.apache.hadoop.hbase.Cell)} for the details of deduplication.
|
||||
*/
|
||||
@Override
|
||||
public void add(byte[] bloomKey, int keyOffset, int keyLength) {
|
||||
|
|
|
@ -37,8 +37,12 @@ class KeySampler {
|
|||
public KeySampler(Random random, byte [] first, byte [] last,
|
||||
DiscreteRNG keyLenRNG) {
|
||||
this.random = random;
|
||||
min = keyPrefixToInt(first);
|
||||
max = keyPrefixToInt(last);
|
||||
int firstLen = keyPrefixToInt(first);
|
||||
int lastLen = keyPrefixToInt(last);
|
||||
min = Math.min(firstLen, lastLen);
|
||||
max = Math.max(firstLen, lastLen);
|
||||
System.out.println(min);
|
||||
System.out.println(max);
|
||||
this.keyLenRNG = keyLenRNG;
|
||||
}
|
||||
|
||||
|
@ -52,7 +56,11 @@ class KeySampler {
|
|||
public void next(BytesWritable key) {
|
||||
key.setSize(Math.max(MIN_KEY_LEN, keyLenRNG.nextInt()));
|
||||
random.nextBytes(key.get());
|
||||
int n = random.nextInt(max - min) + min;
|
||||
int rnd = 0;
|
||||
if (max != min) {
|
||||
rnd = random.nextInt(max - min);
|
||||
}
|
||||
int n = rnd + min;
|
||||
byte[] b = key.get();
|
||||
b[0] = (byte) (n >> 24);
|
||||
b[1] = (byte) (n >> 16);
|
||||
|
|
|
@ -369,7 +369,8 @@ public class TestHFile extends HBaseTestCase {
|
|||
.withOutputStream(fout)
|
||||
.withFileContext(meta)
|
||||
.create();
|
||||
writer.append("foo".getBytes(), "value".getBytes());
|
||||
KeyValue kv = new KeyValue("foo".getBytes(), "f1".getBytes(), null, "value".getBytes());
|
||||
writer.append(kv);
|
||||
writer.close();
|
||||
fout.close();
|
||||
Reader reader = HFile.createReader(fs, mFile, cacheConf, conf);
|
||||
|
|
|
@ -36,9 +36,11 @@ import org.apache.commons.logging.LogFactory;
|
|||
import org.apache.hadoop.fs.FSDataInputStream;
|
||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.Cell;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.KeyValue;
|
||||
import org.apache.hadoop.hbase.KeyValueUtil;
|
||||
import org.apache.hadoop.hbase.SmallTests;
|
||||
import org.apache.hadoop.hbase.fs.HFileSystem;
|
||||
import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
|
||||
|
@ -470,7 +472,9 @@ public class TestHFileBlockCompatibility {
|
|||
return userDataStream;
|
||||
}
|
||||
|
||||
public void write(KeyValue kv) throws IOException{
|
||||
@Override
|
||||
public void write(Cell c) throws IOException {
|
||||
KeyValue kv = KeyValueUtil.ensureKeyValue(c);
|
||||
expectState(State.WRITING);
|
||||
this.dataBlockEncoder.encode(kv, dataBlockEncodingCtx, this.userDataStream);
|
||||
this.unencodedDataSizeWritten += kv.getLength();
|
||||
|
|
|
@ -165,7 +165,8 @@ public class TestHFileEncryption {
|
|||
.withOutputStream(out)
|
||||
.withFileContext(fileContext)
|
||||
.create();
|
||||
writer.append("foo".getBytes(), "value".getBytes());
|
||||
KeyValue kv = new KeyValue("foo".getBytes(), "f1".getBytes(), null, "value".getBytes());
|
||||
writer.append(kv);
|
||||
writer.close();
|
||||
out.close();
|
||||
|
||||
|
|
|
@ -71,10 +71,23 @@ public class TestHFileInlineToRootChunkConversion {
|
|||
sb.setLength(0);
|
||||
|
||||
byte[] k = Bytes.toBytes(keyStr);
|
||||
System.out.println("Key: " + Bytes.toString(k));
|
||||
keys.add(k);
|
||||
System.out.println("RowKey: " + Bytes.toString(k));
|
||||
byte[] f = "f1".getBytes();
|
||||
byte[] q = "q1".getBytes();
|
||||
int keySize = (int) KeyValue.getKeyDataStructureSize(k.length, f.length, q.length);
|
||||
byte[] bytes = new byte[keySize];
|
||||
int pos = 0;
|
||||
pos = Bytes.putShort(bytes, pos, (short) (k.length & 0x0000ffff));
|
||||
pos = Bytes.putBytes(bytes, pos, k, 0, k.length);
|
||||
pos = Bytes.putByte(bytes, pos, (byte) f.length);
|
||||
pos = Bytes.putBytes(bytes, pos, f, 0, f.length);
|
||||
pos = Bytes.putBytes(bytes, pos, q, 0, q.length);
|
||||
pos = Bytes.putLong(bytes, pos, System.currentTimeMillis());
|
||||
pos = Bytes.putByte(bytes, pos, KeyValue.Type.Put.getCode());
|
||||
|
||||
keys.add(bytes);
|
||||
byte[] v = Bytes.toBytes("value" + i);
|
||||
hfw.append(k, v);
|
||||
hfw.append(bytes, v);
|
||||
}
|
||||
hfw.close();
|
||||
|
||||
|
|
|
@ -58,6 +58,8 @@ import org.junit.experimental.categories.Category;
|
|||
*/
|
||||
@Category(MediumTests.class)
|
||||
public class TestHFileSeek extends TestCase {
|
||||
private static final byte[] CF = "f1".getBytes();
|
||||
private static final byte[] QUAL = "q1".getBytes();
|
||||
private static final boolean USE_PREAD = true;
|
||||
private MyOptions options;
|
||||
private Configuration conf;
|
||||
|
@ -70,8 +72,6 @@ public class TestHFileSeek extends TestCase {
|
|||
|
||||
private static final Log LOG = LogFactory.getLog(TestHFileSeek.class);
|
||||
|
||||
private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
|
||||
|
||||
@Override
|
||||
public void setUp() throws IOException {
|
||||
if (options == null) {
|
||||
|
@ -151,9 +151,10 @@ public class TestHFileSeek extends TestCase {
|
|||
System.arraycopy(key.getBytes(), 0, k, 0, key.getLength());
|
||||
byte [] v = new byte [val.getLength()];
|
||||
System.arraycopy(val.getBytes(), 0, v, 0, key.getLength());
|
||||
writer.append(k, v);
|
||||
totalBytes += key.getLength();
|
||||
totalBytes += val.getLength();
|
||||
KeyValue kv = new KeyValue(k, CF, QUAL, v);
|
||||
writer.append(kv);
|
||||
totalBytes += kv.getKeyLength();
|
||||
totalBytes += kv.getValueLength();
|
||||
}
|
||||
timer.stop();
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue