HBASE-17161 MOB : Make ref cell creation more efficient.
This commit is contained in:
parent
cb5c4c146f
commit
4b3ffffa09
|
@ -40,6 +40,7 @@ import org.apache.hadoop.hbase.classification.InterfaceStability;
|
|||
import org.apache.hadoop.hbase.io.HeapSize;
|
||||
import org.apache.hadoop.hbase.io.TagCompressionContext;
|
||||
import org.apache.hadoop.hbase.io.util.Dictionary;
|
||||
import org.apache.hadoop.hbase.io.util.StreamUtils;
|
||||
import org.apache.hadoop.hbase.util.ByteBufferUtils;
|
||||
import org.apache.hadoop.hbase.util.ByteRange;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
|
@ -433,6 +434,13 @@ public final class CellUtil {
|
|||
return new TagRewriteCell(cell, tags);
|
||||
}
|
||||
|
||||
public static Cell createCell(Cell cell, byte[] value, byte[] tags) {
|
||||
if (cell instanceof ByteBufferCell) {
|
||||
return new ValueAndTagRewriteByteBufferCell((ByteBufferCell) cell, value, tags);
|
||||
}
|
||||
return new ValueAndTagRewriteCell(cell, value, tags);
|
||||
}
|
||||
|
||||
/**
|
||||
* This can be used when a Cell has to change with addition/removal of one or more tags. This is an
|
||||
* efficient way to do so in which only the tags bytes part need to recreated and copied. All other
|
||||
|
@ -556,9 +564,9 @@ public final class CellUtil {
|
|||
|
||||
@Override
|
||||
public long heapSize() {
|
||||
long sum = HEAP_SIZE_OVERHEAD + CellUtil.estimatedHeapSizeOf(cell) - cell.getTagsLength();
|
||||
long sum = HEAP_SIZE_OVERHEAD + CellUtil.estimatedHeapSizeOf(cell);
|
||||
if (this.tags != null) {
|
||||
sum += ClassSize.ARRAY + this.tags.length;
|
||||
sum += ClassSize.sizeOf(this.tags, this.tags.length);
|
||||
}
|
||||
return sum;
|
||||
}
|
||||
|
@ -605,7 +613,7 @@ public final class CellUtil {
|
|||
|
||||
@Override
|
||||
public void write(ByteBuffer buf, int offset) {
|
||||
offset = KeyValueUtil.appendToByteBuffer(this.cell, buf, offset, false);
|
||||
offset = KeyValueUtil.appendTo(this.cell, buf, offset, false);
|
||||
int tagsLen = this.tags == null ? 0 : this.tags.length;
|
||||
if (tagsLen > 0) {
|
||||
offset = ByteBufferUtils.putAsShort(buf, offset, tagsLen);
|
||||
|
@ -763,9 +771,9 @@ public final class CellUtil {
|
|||
|
||||
@Override
|
||||
public long heapSize() {
|
||||
long sum = HEAP_SIZE_OVERHEAD + CellUtil.estimatedHeapSizeOf(cell) - cell.getTagsLength();
|
||||
long sum = HEAP_SIZE_OVERHEAD + CellUtil.estimatedHeapSizeOf(cell);
|
||||
if (this.tags != null) {
|
||||
sum += ClassSize.ARRAY + this.tags.length;
|
||||
sum += ClassSize.sizeOf(this.tags, this.tags.length);
|
||||
}
|
||||
return sum;
|
||||
}
|
||||
|
@ -794,7 +802,7 @@ public final class CellUtil {
|
|||
|
||||
@Override
|
||||
public void write(ByteBuffer buf, int offset) {
|
||||
offset = KeyValueUtil.appendToByteBuffer(this.cell, buf, offset, false);
|
||||
offset = KeyValueUtil.appendTo(this.cell, buf, offset, false);
|
||||
int tagsLen = this.tags == null ? 0 : this.tags.length;
|
||||
if (tagsLen > 0) {
|
||||
offset = ByteBufferUtils.putAsShort(buf, offset, tagsLen);
|
||||
|
@ -871,6 +879,184 @@ public final class CellUtil {
|
|||
}
|
||||
}
|
||||
|
||||
@InterfaceAudience.Private
|
||||
private static class ValueAndTagRewriteCell extends TagRewriteCell {
|
||||
|
||||
protected byte[] value;
|
||||
|
||||
public ValueAndTagRewriteCell(Cell cell, byte[] value, byte[] tags) {
|
||||
super(cell, tags);
|
||||
this.value = value;
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte[] getValueArray() {
|
||||
return this.value;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getValueOffset() {
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getValueLength() {
|
||||
return this.value == null ? 0 : this.value.length;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long heapSize() {
|
||||
long sum = ClassSize.REFERENCE + super.heapSize();
|
||||
if (this.value != null) {
|
||||
sum += ClassSize.sizeOf(this.value, this.value.length);
|
||||
}
|
||||
return sum;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int write(OutputStream out, boolean withTags) throws IOException {
|
||||
return write(out, withTags, this.cell, this.value, this.tags);
|
||||
}
|
||||
|
||||
// Made into a static method so as to reuse the logic within ValueAndTagRewriteByteBufferCell
|
||||
static int write(OutputStream out, boolean withTags, Cell cell, byte[] value, byte[] tags)
|
||||
throws IOException {
|
||||
int valLen = value == null ? 0 : value.length;
|
||||
ByteBufferUtils.putInt(out, KeyValueUtil.keyLength(cell));// Key length
|
||||
ByteBufferUtils.putInt(out, valLen);// Value length
|
||||
int len = 2 * Bytes.SIZEOF_INT;
|
||||
len += CellUtil.writeFlatKey(cell, out);// Key
|
||||
if (valLen > 0) out.write(value);// Value
|
||||
len += valLen;
|
||||
if (withTags && tags != null) {
|
||||
// Write the tagsLength 2 bytes
|
||||
out.write((byte) (0xff & (tags.length >> 8)));
|
||||
out.write((byte) (0xff & tags.length));
|
||||
out.write(tags);
|
||||
len += KeyValue.TAGS_LENGTH_SIZE + tags.length;
|
||||
}
|
||||
return len;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getSerializedSize(boolean withTags) {
|
||||
return super.getSerializedSize(withTags) - this.cell.getValueLength() + this.value.length;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void write(ByteBuffer buf, int offset) {
|
||||
write(buf, offset, this.cell, this.value, this.tags);
|
||||
}
|
||||
|
||||
// Made into a static method so as to reuse the logic within ValueAndTagRewriteByteBufferCell
|
||||
static void write(ByteBuffer buf, int offset, Cell cell, byte[] value, byte[] tags) {
|
||||
offset = ByteBufferUtils.putInt(buf, offset, KeyValueUtil.keyLength(cell));// Key length
|
||||
offset = ByteBufferUtils.putInt(buf, offset, value.length);// Value length
|
||||
offset = KeyValueUtil.appendKeyTo(cell, buf, offset);
|
||||
ByteBufferUtils.copyFromArrayToBuffer(buf, offset, value, 0, value.length);
|
||||
offset += value.length;
|
||||
int tagsLen = tags == null ? 0 : tags.length;
|
||||
if (tagsLen > 0) {
|
||||
offset = ByteBufferUtils.putAsShort(buf, offset, tagsLen);
|
||||
ByteBufferUtils.copyFromArrayToBuffer(buf, offset, tags, 0, tagsLen);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public long heapOverhead() {
|
||||
long overhead = super.heapOverhead() + ClassSize.REFERENCE;
|
||||
if (this.value != null) {
|
||||
overhead += ClassSize.ARRAY;
|
||||
}
|
||||
return overhead;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Cell deepClone() {
|
||||
Cell clonedBaseCell = ((ExtendedCell) this.cell).deepClone();
|
||||
return new ValueAndTagRewriteCell(clonedBaseCell, this.tags, this.value);
|
||||
}
|
||||
}
|
||||
|
||||
@InterfaceAudience.Private
|
||||
private static class ValueAndTagRewriteByteBufferCell extends TagRewriteByteBufferCell {
|
||||
|
||||
protected byte[] value;
|
||||
|
||||
public ValueAndTagRewriteByteBufferCell(ByteBufferCell cell, byte[] value, byte[] tags) {
|
||||
super(cell, tags);
|
||||
this.value = value;
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte[] getValueArray() {
|
||||
return this.value;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getValueOffset() {
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getValueLength() {
|
||||
return this.value == null ? 0 : this.value.length;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ByteBuffer getValueByteBuffer() {
|
||||
return ByteBuffer.wrap(this.value);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getValuePosition() {
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long heapSize() {
|
||||
long sum = ClassSize.REFERENCE + super.heapSize();
|
||||
if (this.value != null) {
|
||||
sum += ClassSize.sizeOf(this.value, this.value.length);
|
||||
}
|
||||
return sum;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int write(OutputStream out, boolean withTags) throws IOException {
|
||||
return ValueAndTagRewriteCell.write(out, withTags, this.cell, this.value, this.tags);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getSerializedSize(boolean withTags) {
|
||||
return super.getSerializedSize(withTags) - this.cell.getValueLength() + this.value.length;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void write(ByteBuffer buf, int offset) {
|
||||
ValueAndTagRewriteCell.write(buf, offset, this.cell, this.value, this.tags);
|
||||
}
|
||||
|
||||
@Override
|
||||
public long heapOverhead() {
|
||||
long overhead = super.heapOverhead() + ClassSize.REFERENCE;
|
||||
if (this.value != null) {
|
||||
overhead += ClassSize.ARRAY;
|
||||
}
|
||||
return overhead;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Cell deepClone() {
|
||||
Cell clonedBaseCell = ((ExtendedCell) this.cell).deepClone();
|
||||
if (clonedBaseCell instanceof ByteBufferCell) {
|
||||
return new ValueAndTagRewriteByteBufferCell((ByteBufferCell) clonedBaseCell, this.tags,
|
||||
this.value);
|
||||
}
|
||||
return new ValueAndTagRewriteCell(clonedBaseCell, this.tags, this.value);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @param cellScannerables
|
||||
* @return CellScanner interface over <code>cellIterables</code>
|
||||
|
@ -1577,6 +1763,34 @@ public final class CellUtil {
|
|||
out.writeByte(cell.getTypeByte());
|
||||
}
|
||||
|
||||
public static int writeFlatKey(Cell cell, OutputStream out) throws IOException {
|
||||
short rowLen = cell.getRowLength();
|
||||
byte fLen = cell.getFamilyLength();
|
||||
int qLen = cell.getQualifierLength();
|
||||
// Using just one if/else loop instead of every time checking before writing every
|
||||
// component of cell
|
||||
if (cell instanceof ByteBufferCell) {
|
||||
StreamUtils.writeShort(out, rowLen);
|
||||
ByteBufferUtils.copyBufferToStream(out, ((ByteBufferCell) cell).getRowByteBuffer(),
|
||||
((ByteBufferCell) cell).getRowPosition(), rowLen);
|
||||
out.write(fLen);
|
||||
ByteBufferUtils.copyBufferToStream(out, ((ByteBufferCell) cell).getFamilyByteBuffer(),
|
||||
((ByteBufferCell) cell).getFamilyPosition(), fLen);
|
||||
ByteBufferUtils.copyBufferToStream(out, ((ByteBufferCell) cell).getQualifierByteBuffer(),
|
||||
((ByteBufferCell) cell).getQualifierPosition(), qLen);
|
||||
} else {
|
||||
StreamUtils.writeShort(out, rowLen);
|
||||
out.write(cell.getRowArray(), cell.getRowOffset(), rowLen);
|
||||
out.write(fLen);
|
||||
out.write(cell.getFamilyArray(), cell.getFamilyOffset(), fLen);
|
||||
out.write(cell.getQualifierArray(), cell.getQualifierOffset(), qLen);
|
||||
}
|
||||
StreamUtils.writeLong(out, cell.getTimestamp());
|
||||
out.write(cell.getTypeByte());
|
||||
return Bytes.SIZEOF_SHORT + rowLen + Bytes.SIZEOF_BYTE + fLen + qLen + Bytes.SIZEOF_LONG
|
||||
+ Bytes.SIZEOF_BYTE;
|
||||
}
|
||||
|
||||
/**
|
||||
* Writes the row from the given cell to the output stream
|
||||
* @param out The outputstream to which the data has to be written
|
||||
|
@ -2018,6 +2232,20 @@ public final class CellUtil {
|
|||
return Bytes.toLong(cell.getValueArray(), cell.getValueOffset());
|
||||
}
|
||||
|
||||
/**
|
||||
* Converts the value bytes of the given cell into a int value
|
||||
*
|
||||
* @param cell
|
||||
* @return value as int
|
||||
*/
|
||||
public static int getValueAsInt(Cell cell) {
|
||||
if (cell instanceof ByteBufferCell) {
|
||||
return ByteBufferUtils.toInt(((ByteBufferCell) cell).getValueByteBuffer(),
|
||||
((ByteBufferCell) cell).getValuePosition());
|
||||
}
|
||||
return Bytes.toInt(cell.getValueArray(), cell.getValueOffset());
|
||||
}
|
||||
|
||||
/**
|
||||
* Converts the value bytes of the given cell into a double value
|
||||
*
|
||||
|
@ -2958,7 +3186,7 @@ public final class CellUtil {
|
|||
// Normally all Cell impls within Server will be of type ExtendedCell. Just considering the
|
||||
// other case also. The data fragments within Cell is copied into buf as in KeyValue
|
||||
// serialization format only.
|
||||
KeyValueUtil.appendToByteBuffer(cell, buf, offset, true);
|
||||
KeyValueUtil.appendTo(cell, buf, offset, true);
|
||||
}
|
||||
if (buf.hasArray()) {
|
||||
KeyValue newKv;
|
||||
|
|
|
@ -62,7 +62,7 @@ public interface ExtendedCell extends Cell, SettableSequenceId, SettableTimestam
|
|||
int getSerializedSize(boolean withTags);
|
||||
|
||||
/**
|
||||
* Write the given Cell into the given buf's offset.
|
||||
* Write this Cell into the given buf's offset in a {@link KeyValue} format.
|
||||
* @param buf The buffer where to write the Cell.
|
||||
* @param offset The offset within buffer, to write the Cell.
|
||||
*/
|
||||
|
|
|
@ -171,9 +171,20 @@ public class KeyValueUtil {
|
|||
/**
|
||||
* Copy the Cell content into the passed buf in KeyValue serialization format.
|
||||
*/
|
||||
public static int appendToByteBuffer(Cell cell, ByteBuffer buf, int offset, boolean withTags) {
|
||||
public static int appendTo(Cell cell, ByteBuffer buf, int offset, boolean withTags) {
|
||||
offset = ByteBufferUtils.putInt(buf, offset, keyLength(cell));// Key length
|
||||
offset = ByteBufferUtils.putInt(buf, offset, cell.getValueLength());// Value length
|
||||
offset = appendKeyTo(cell, buf, offset);
|
||||
offset = CellUtil.copyValueTo(cell, buf, offset);// Value bytes
|
||||
int tagsLength = cell.getTagsLength();
|
||||
if (withTags && (tagsLength > 0)) {
|
||||
offset = ByteBufferUtils.putAsShort(buf, offset, tagsLength);// Tags length
|
||||
offset = CellUtil.copyTagTo(cell, buf, offset);// Tags bytes
|
||||
}
|
||||
return offset;
|
||||
}
|
||||
|
||||
public static int appendKeyTo(Cell cell, ByteBuffer buf, int offset) {
|
||||
offset = ByteBufferUtils.putShort(buf, offset, cell.getRowLength());// RK length
|
||||
offset = CellUtil.copyRowTo(cell, buf, offset);// Row bytes
|
||||
offset = ByteBufferUtils.putByte(buf, offset, cell.getFamilyLength());// CF length
|
||||
|
@ -181,12 +192,6 @@ public class KeyValueUtil {
|
|||
offset = CellUtil.copyQualifierTo(cell, buf, offset);// Qualifier bytes
|
||||
offset = ByteBufferUtils.putLong(buf, offset, cell.getTimestamp());// TS
|
||||
offset = ByteBufferUtils.putByte(buf, offset, cell.getTypeByte());// Type
|
||||
offset = CellUtil.copyValueTo(cell, buf, offset);// Value bytes
|
||||
int tagsLength = cell.getTagsLength();
|
||||
if (withTags && (tagsLength > 0)) {
|
||||
offset = ByteBufferUtils.putAsShort(buf, offset, tagsLength);// Tags length
|
||||
offset = CellUtil.copyTagTo(cell, buf, offset);// Tags bytes
|
||||
}
|
||||
return offset;
|
||||
}
|
||||
|
||||
|
|
|
@ -247,6 +247,23 @@ public final class TagUtil {
|
|||
return tags;
|
||||
}
|
||||
|
||||
public static byte[] concatTags(byte[] tags, Cell cell) {
|
||||
int cellTagsLen = cell.getTagsLength();
|
||||
if (cellTagsLen == 0) {
|
||||
// If no Tags, return early.
|
||||
return tags;
|
||||
}
|
||||
byte[] b = new byte[tags.length + cellTagsLen];
|
||||
int pos = Bytes.putBytes(b, 0, tags, 0, tags.length);
|
||||
if (cell instanceof ByteBufferCell) {
|
||||
ByteBufferUtils.copyFromBufferToArray(b, ((ByteBufferCell) cell).getTagsByteBuffer(),
|
||||
((ByteBufferCell) cell).getTagsPosition(), pos, cellTagsLen);
|
||||
} else {
|
||||
Bytes.putBytes(b, pos, cell.getTagsArray(), cell.getTagsOffset(), cellTagsLen);
|
||||
}
|
||||
return b;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return Carry forward the TTL tag.
|
||||
*/
|
||||
|
|
|
@ -26,13 +26,10 @@ import org.apache.commons.logging.Log;
|
|||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.ArrayBackedTag;
|
||||
import org.apache.hadoop.hbase.Cell;
|
||||
import org.apache.hadoop.hbase.CellUtil;
|
||||
import org.apache.hadoop.hbase.KeyValue;
|
||||
import org.apache.hadoop.hbase.KeyValueUtil;
|
||||
import org.apache.hadoop.hbase.Tag;
|
||||
import org.apache.hadoop.hbase.TagType;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.client.Scan;
|
||||
import org.apache.hadoop.hbase.regionserver.CellSink;
|
||||
|
@ -184,8 +181,6 @@ public class DefaultMobStoreCompactor extends DefaultCompactor {
|
|||
byte[] fileName = null;
|
||||
StoreFileWriter mobFileWriter = null, delFileWriter = null;
|
||||
long mobCells = 0, deleteMarkersCount = 0;
|
||||
Tag tableNameTag = new ArrayBackedTag(TagType.MOB_TABLE_NAME_TAG_TYPE,
|
||||
store.getTableName().getName());
|
||||
long cellsCountCompactedToMob = 0, cellsCountCompactedFromMob = 0;
|
||||
long cellsSizeCompactedToMob = 0, cellsSizeCompactedFromMob = 0;
|
||||
try {
|
||||
|
@ -250,7 +245,8 @@ public class DefaultMobStoreCompactor extends DefaultCompactor {
|
|||
mobCells++;
|
||||
// append the original keyValue in the mob file.
|
||||
mobFileWriter.append(c);
|
||||
KeyValue reference = MobUtils.createMobRefKeyValue(c, fileName, tableNameTag);
|
||||
Cell reference = MobUtils.createMobRefCell(c, fileName,
|
||||
this.mobStore.getRefCellTags());
|
||||
// write the cell whose value is the path of a mob file to the store file.
|
||||
writer.append(reference);
|
||||
cellsCountCompactedToMob++;
|
||||
|
|
|
@ -27,12 +27,9 @@ import org.apache.commons.logging.Log;
|
|||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.ArrayBackedTag;
|
||||
import org.apache.hadoop.hbase.Cell;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.KeyValue;
|
||||
import org.apache.hadoop.hbase.Tag;
|
||||
import org.apache.hadoop.hbase.TagType;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
|
||||
import org.apache.hadoop.hbase.regionserver.DefaultStoreFlusher;
|
||||
|
@ -167,8 +164,6 @@ public class DefaultMobStoreFlusher extends DefaultStoreFlusher {
|
|||
// the relative path is mobFiles
|
||||
byte[] fileName = Bytes.toBytes(mobFileWriter.getPath().getName());
|
||||
try {
|
||||
Tag tableNameTag = new ArrayBackedTag(TagType.MOB_TABLE_NAME_TAG_TYPE,
|
||||
store.getTableName().getName());
|
||||
List<Cell> cells = new ArrayList<Cell>();
|
||||
boolean hasMore;
|
||||
ScannerContext scannerContext =
|
||||
|
@ -192,7 +187,8 @@ public class DefaultMobStoreFlusher extends DefaultStoreFlusher {
|
|||
|
||||
// append the tags to the KeyValue.
|
||||
// The key is same, the value is the filename of the mob file
|
||||
KeyValue reference = MobUtils.createMobRefKeyValue(c, fileName, tableNameTag);
|
||||
Cell reference = MobUtils.createMobRefCell(c, fileName,
|
||||
this.mobStore.getRefCellTags());
|
||||
writer.append(reference);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -48,10 +48,10 @@ import org.apache.hadoop.hbase.HColumnDescriptor;
|
|||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||
import org.apache.hadoop.hbase.KeyValue;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.Tag;
|
||||
import org.apache.hadoop.hbase.TagType;
|
||||
import org.apache.hadoop.hbase.TagUtil;
|
||||
import org.apache.hadoop.hbase.backup.HFileArchiver;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.client.Scan;
|
||||
|
@ -424,7 +424,7 @@ public final class MobUtils {
|
|||
* cloning the snapshot.
|
||||
* @return The mob reference KeyValue.
|
||||
*/
|
||||
public static KeyValue createMobRefKeyValue(Cell cell, byte[] fileName, Tag tableNameTag) {
|
||||
public static Cell createMobRefCell(Cell cell, byte[] fileName, Tag tableNameTag) {
|
||||
// Append the tags to the KeyValue.
|
||||
// The key is same, the value is the filename of the mob file
|
||||
List<Tag> tags = new ArrayList<Tag>();
|
||||
|
@ -437,15 +437,13 @@ public final class MobUtils {
|
|||
// snapshot for mob files.
|
||||
tags.add(tableNameTag);
|
||||
// Add the existing tags.
|
||||
tags.addAll(CellUtil.getTags(cell));
|
||||
int valueLength = cell.getValueLength();
|
||||
byte[] refValue = Bytes.add(Bytes.toBytes(valueLength), fileName);
|
||||
KeyValue reference = new KeyValue(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength(),
|
||||
cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength(),
|
||||
cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength(),
|
||||
cell.getTimestamp(), KeyValue.Type.Put, refValue, 0, refValue.length, tags);
|
||||
reference.setSequenceId(cell.getSequenceId());
|
||||
return reference;
|
||||
TagUtil.carryForwardTags(tags, cell);
|
||||
return createMobRefCell(cell, fileName, TagUtil.fromList(tags));
|
||||
}
|
||||
|
||||
public static Cell createMobRefCell(Cell cell, byte[] fileName, byte[] refCellTags) {
|
||||
byte[] refValue = Bytes.add(Bytes.toBytes(cell.getValueLength()), fileName);
|
||||
return CellUtil.createCell(cell, refValue, TagUtil.concatTags(refCellTags, cell));
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -666,7 +664,7 @@ public final class MobUtils {
|
|||
* @return The real mob value length.
|
||||
*/
|
||||
public static int getMobValueLength(Cell cell) {
|
||||
return Bytes.toInt(cell.getValueArray(), cell.getValueOffset(), Bytes.SIZEOF_INT);
|
||||
return CellUtil.getValueAsInt(cell);
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -43,10 +43,10 @@ import org.apache.hadoop.hbase.Cell;
|
|||
import org.apache.hadoop.hbase.CellComparator;
|
||||
import org.apache.hadoop.hbase.HColumnDescriptor;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.KeyValue;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.Tag;
|
||||
import org.apache.hadoop.hbase.TagType;
|
||||
import org.apache.hadoop.hbase.TagUtil;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.client.Connection;
|
||||
import org.apache.hadoop.hbase.client.ConnectionFactory;
|
||||
|
@ -92,7 +92,7 @@ public class PartitionedMobCompactor extends MobCompactor {
|
|||
private final Path tempPath;
|
||||
private final Path bulkloadPath;
|
||||
private final CacheConfig compactionCacheConfig;
|
||||
private final Tag tableNameTag;
|
||||
private final byte[] refCellTags;
|
||||
private Encryption.Context cryptoContext = Encryption.Context.NONE;
|
||||
|
||||
public PartitionedMobCompactor(Configuration conf, FileSystem fs, TableName tableName,
|
||||
|
@ -113,7 +113,11 @@ public class PartitionedMobCompactor extends MobCompactor {
|
|||
Configuration copyOfConf = new Configuration(conf);
|
||||
copyOfConf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0f);
|
||||
compactionCacheConfig = new CacheConfig(copyOfConf);
|
||||
tableNameTag = new ArrayBackedTag(TagType.MOB_TABLE_NAME_TAG_TYPE, tableName.getName());
|
||||
List<Tag> tags = new ArrayList<>(2);
|
||||
tags.add(MobConstants.MOB_REF_TAG);
|
||||
Tag tableNameTag = new ArrayBackedTag(TagType.MOB_TABLE_NAME_TAG_TYPE, tableName.getName());
|
||||
tags.add(tableNameTag);
|
||||
this.refCellTags = TagUtil.fromList(tags);
|
||||
cryptoContext = EncryptionUtil.createEncryptionContext(copyOfConf, column);
|
||||
}
|
||||
|
||||
|
@ -421,7 +425,7 @@ public class PartitionedMobCompactor extends MobCompactor {
|
|||
// write the mob cell to the mob file.
|
||||
writer.append(cell);
|
||||
// write the new reference cell to the store file.
|
||||
KeyValue reference = MobUtils.createMobRefKeyValue(cell, fileName, tableNameTag);
|
||||
Cell reference = MobUtils.createMobRefCell(cell, fileName, this.refCellTags);
|
||||
refFileWriter.append(reference);
|
||||
mobCells++;
|
||||
}
|
||||
|
|
|
@ -32,6 +32,7 @@ import org.apache.commons.logging.LogFactory;
|
|||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.ArrayBackedTag;
|
||||
import org.apache.hadoop.hbase.Cell;
|
||||
import org.apache.hadoop.hbase.CellComparator;
|
||||
import org.apache.hadoop.hbase.HColumnDescriptor;
|
||||
|
@ -40,6 +41,7 @@ import org.apache.hadoop.hbase.KeyValue;
|
|||
import org.apache.hadoop.hbase.KeyValue.Type;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.Tag;
|
||||
import org.apache.hadoop.hbase.TagType;
|
||||
import org.apache.hadoop.hbase.TagUtil;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.client.Scan;
|
||||
|
@ -101,6 +103,13 @@ public class HMobStore extends HStore {
|
|||
private TableName tableLockName;
|
||||
private Map<String, List<Path>> map = new ConcurrentHashMap<String, List<Path>>();
|
||||
private final IdLock keyLock = new IdLock();
|
||||
// When we add a MOB reference cell to the HFile, we will add 2 tags along with it
|
||||
// 1. A ref tag with type TagType.MOB_REFERENCE_TAG_TYPE. This just denote this this cell is not
|
||||
// original one but a ref to another MOB Cell.
|
||||
// 2. Table name tag. It's very useful in cloning the snapshot. When reading from the cloning
|
||||
// table, we need to find the original mob files by this table name. For details please see
|
||||
// cloning snapshot for mob files.
|
||||
private final byte[] refCellTags;
|
||||
|
||||
public HMobStore(final HRegion region, final HColumnDescriptor family,
|
||||
final Configuration confParam) throws IOException {
|
||||
|
@ -120,6 +129,12 @@ public class HMobStore extends HStore {
|
|||
tableLockManager = region.getRegionServerServices().getTableLockManager();
|
||||
tableLockName = MobUtils.getTableLockName(getTableName());
|
||||
}
|
||||
List<Tag> tags = new ArrayList<>(2);
|
||||
tags.add(MobConstants.MOB_REF_TAG);
|
||||
Tag tableNameTag = new ArrayBackedTag(TagType.MOB_TABLE_NAME_TAG_TYPE,
|
||||
getTableName().getName());
|
||||
tags.add(tableNameTag);
|
||||
this.refCellTags = TagUtil.fromList(tags);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -583,4 +598,8 @@ public class HMobStore extends HStore {
|
|||
public long getMobScanCellsSize() {
|
||||
return mobScanCellsSize;
|
||||
}
|
||||
|
||||
public byte[] getRefCellTags() {
|
||||
return this.refCellTags;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -97,9 +97,9 @@ public class TestHMobStore {
|
|||
private byte[] value2 = Bytes.toBytes("value2");
|
||||
private Path mobFilePath;
|
||||
private Date currentDate = new Date();
|
||||
private KeyValue seekKey1;
|
||||
private KeyValue seekKey2;
|
||||
private KeyValue seekKey3;
|
||||
private Cell seekKey1;
|
||||
private Cell seekKey2;
|
||||
private Cell seekKey3;
|
||||
private NavigableSet<byte[]> qualifiers =
|
||||
new ConcurrentSkipListSet<byte[]>(Bytes.BYTES_COMPARATOR);
|
||||
private List<Cell> expected = new ArrayList<Cell>();
|
||||
|
@ -195,9 +195,9 @@ public class TestHMobStore {
|
|||
KeyValue kv1 = new KeyValue(row, family, qf1, Long.MAX_VALUE, referenceValue);
|
||||
KeyValue kv2 = new KeyValue(row, family, qf2, Long.MAX_VALUE, referenceValue);
|
||||
KeyValue kv3 = new KeyValue(row2, family, qf3, Long.MAX_VALUE, referenceValue);
|
||||
seekKey1 = MobUtils.createMobRefKeyValue(kv1, referenceValue, tableNameTag);
|
||||
seekKey2 = MobUtils.createMobRefKeyValue(kv2, referenceValue, tableNameTag);
|
||||
seekKey3 = MobUtils.createMobRefKeyValue(kv3, referenceValue, tableNameTag);
|
||||
seekKey1 = MobUtils.createMobRefCell(kv1, referenceValue, tableNameTag);
|
||||
seekKey2 = MobUtils.createMobRefCell(kv2, referenceValue, tableNameTag);
|
||||
seekKey3 = MobUtils.createMobRefCell(kv3, referenceValue, tableNameTag);
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -115,11 +115,11 @@ public class HFileTestUtil {
|
|||
try {
|
||||
// subtract 2 since iterateOnSplits doesn't include boundary keys
|
||||
for (byte[] key : Bytes.iterateOnSplits(startKey, endKey, numRows - 2)) {
|
||||
KeyValue kv = new KeyValue(key, family, qualifier, now, key);
|
||||
Cell kv = new KeyValue(key, family, qualifier, now, key);
|
||||
if (withTag) {
|
||||
// add a tag. Arbitrarily chose mob tag since we have a helper already.
|
||||
Tag tableNameTag = new ArrayBackedTag(TagType.MOB_TABLE_NAME_TAG_TYPE, key);
|
||||
kv = MobUtils.createMobRefKeyValue(kv, key, tableNameTag);
|
||||
kv = MobUtils.createMobRefCell(kv, key, tableNameTag);
|
||||
|
||||
// verify that the kv has the tag.
|
||||
Tag t = CellUtil.getTag(kv, TagType.MOB_TABLE_NAME_TAG_TYPE);
|
||||
|
|
Loading…
Reference in New Issue