HBASE-17183 Handle ByteBufferCell while making TagRewriteCell.

This commit is contained in:
anoopsamjohn 2016-11-28 16:47:25 +05:30
parent 4771c217cd
commit bbb81a7ac3
1 changed files with 258 additions and 11 deletions
hbase-common/src/main/java/org/apache/hadoop/hbase

View File

@ -426,6 +426,9 @@ public final class CellUtil {
* @return A new cell which is having the extra tags also added to it.
*/
public static Cell createCell(Cell cell, byte[] tags) {
if (cell instanceof ByteBufferCell) {
return new TagRewriteByteBufferCell((ByteBufferCell) cell, tags);
}
return new TagRewriteCell(cell, tags);
}
@ -438,7 +441,7 @@ public final class CellUtil {
private static class TagRewriteCell implements ExtendedCell {
protected Cell cell;
protected byte[] tags;
private static final long HEAP_SIZE_OVERHEAD = 2 * ClassSize.REFERENCE + ClassSize.ARRAY;
private static final long HEAP_SIZE_OVERHEAD = ClassSize.OBJECT + 2 * ClassSize.REFERENCE;
/**
* @param cell The original Cell which it rewrites
@ -552,12 +555,9 @@ public final class CellUtil {
@Override
public long heapSize() {
long sum = CellUtil.estimatedHeapSizeOf(cell) - cell.getTagsLength();
sum += ClassSize.OBJECT;// this object itself
sum += (2 * ClassSize.REFERENCE);// pointers to cell and tags array
long sum = HEAP_SIZE_OVERHEAD + CellUtil.estimatedHeapSizeOf(cell) - cell.getTagsLength();
if (this.tags != null) {
sum += ClassSize.align(ClassSize.ARRAY);// "tags"
sum += this.tags.length;
sum += ClassSize.ARRAY + this.tags.length;
}
return sum;
}
@ -605,15 +605,20 @@ public final class CellUtil {
@Override
public void write(ByteBuffer buf, int offset) {
offset = KeyValueUtil.appendToByteBuffer(this.cell, buf, offset, false);
int tagsLen = this.tags.length;
assert tagsLen > 0;
int tagsLen = this.tags == null ? 0 : this.tags.length;
if (tagsLen > 0) {
offset = ByteBufferUtils.putAsShort(buf, offset, tagsLen);
ByteBufferUtils.copyFromArrayToBuffer(buf, offset, this.tags, 0, tagsLen);
}
}
@Override
public long heapOverhead() {
return ((ExtendedCell) this.cell).heapOverhead() + HEAP_SIZE_OVERHEAD;
long overhead = ((ExtendedCell) this.cell).heapOverhead() + HEAP_SIZE_OVERHEAD;
if (this.tags != null) {
overhead += ClassSize.ARRAY;
}
return overhead;
}
@Override
@ -623,6 +628,248 @@ public final class CellUtil {
}
}
@InterfaceAudience.Private
private static class TagRewriteByteBufferCell extends ByteBufferCell implements ExtendedCell {
protected ByteBufferCell cell;
protected byte[] tags;
private static final long HEAP_SIZE_OVERHEAD = ClassSize.OBJECT + 2 * ClassSize.REFERENCE;
/**
* @param cell The original ByteBufferCell which it rewrites
* @param tags the tags bytes. The array suppose to contain the tags bytes alone.
*/
public TagRewriteByteBufferCell(ByteBufferCell cell, byte[] tags) {
assert cell instanceof ExtendedCell;
assert tags != null;
this.cell = cell;
this.tags = tags;
// tag offset will be treated as 0 and length this.tags.length
if (this.cell instanceof TagRewriteByteBufferCell) {
// Cleaning the ref so that the byte[] can be GCed
((TagRewriteByteBufferCell) this.cell).tags = null;
}
}
@Override
public byte[] getRowArray() {
return this.cell.getRowArray();
}
@Override
public int getRowOffset() {
return this.cell.getRowOffset();
}
@Override
public short getRowLength() {
return this.cell.getRowLength();
}
@Override
public byte[] getFamilyArray() {
return this.cell.getFamilyArray();
}
@Override
public int getFamilyOffset() {
return this.cell.getFamilyOffset();
}
@Override
public byte getFamilyLength() {
return this.cell.getFamilyLength();
}
@Override
public byte[] getQualifierArray() {
return this.cell.getQualifierArray();
}
@Override
public int getQualifierOffset() {
return this.cell.getQualifierOffset();
}
@Override
public int getQualifierLength() {
return this.cell.getQualifierLength();
}
@Override
public long getTimestamp() {
return this.cell.getTimestamp();
}
@Override
public byte getTypeByte() {
return this.cell.getTypeByte();
}
@Override
public long getSequenceId() {
return this.cell.getSequenceId();
}
@Override
public byte[] getValueArray() {
return this.cell.getValueArray();
}
@Override
public int getValueOffset() {
return this.cell.getValueOffset();
}
@Override
public int getValueLength() {
return this.cell.getValueLength();
}
@Override
public byte[] getTagsArray() {
return this.tags;
}
@Override
public int getTagsOffset() {
return 0;
}
@Override
public int getTagsLength() {
if (null == this.tags) {
// Nulled out tags array optimization in constructor
return 0;
}
return this.tags.length;
}
@Override
public void setSequenceId(long seqId) throws IOException {
CellUtil.setSequenceId(this.cell, seqId);
}
@Override
public void setTimestamp(long ts) throws IOException {
CellUtil.setTimestamp(this.cell, ts);
}
@Override
public void setTimestamp(byte[] ts, int tsOffset) throws IOException {
CellUtil.setTimestamp(this.cell, ts, tsOffset);
}
@Override
public long heapSize() {
long sum = HEAP_SIZE_OVERHEAD + CellUtil.estimatedHeapSizeOf(cell) - cell.getTagsLength();
if (this.tags != null) {
sum += ClassSize.ARRAY + this.tags.length;
}
return sum;
}
@Override
public int write(OutputStream out, boolean withTags) throws IOException {
int len = ((ExtendedCell) this.cell).write(out, false);
if (withTags && this.tags != null) {
// Write the tagsLength 2 bytes
out.write((byte) (0xff & (this.tags.length >> 8)));
out.write((byte) (0xff & this.tags.length));
out.write(this.tags);
len += KeyValue.TAGS_LENGTH_SIZE + this.tags.length;
}
return len;
}
@Override
public int getSerializedSize(boolean withTags) {
int len = ((ExtendedCell) this.cell).getSerializedSize(false);
if (withTags && this.tags != null) {
len += KeyValue.TAGS_LENGTH_SIZE + this.tags.length;
}
return len;
}
@Override
public void write(ByteBuffer buf, int offset) {
offset = KeyValueUtil.appendToByteBuffer(this.cell, buf, offset, false);
int tagsLen = this.tags == null ? 0 : this.tags.length;
if (tagsLen > 0) {
offset = ByteBufferUtils.putAsShort(buf, offset, tagsLen);
ByteBufferUtils.copyFromArrayToBuffer(buf, offset, this.tags, 0, tagsLen);
}
}
@Override
public long heapOverhead() {
long overhead = ((ExtendedCell) this.cell).heapOverhead() + HEAP_SIZE_OVERHEAD;
if (this.tags != null) {
overhead += ClassSize.ARRAY;
}
return overhead;
}
@Override
public Cell deepClone() {
Cell clonedBaseCell = ((ExtendedCell) this.cell).deepClone();
if (clonedBaseCell instanceof ByteBufferCell) {
return new TagRewriteByteBufferCell((ByteBufferCell) clonedBaseCell, this.tags);
}
return new TagRewriteCell(clonedBaseCell, this.tags);
}
@Override
public ByteBuffer getRowByteBuffer() {
return this.cell.getRowByteBuffer();
}
@Override
public int getRowPosition() {
return this.cell.getRowPosition();
}
@Override
public ByteBuffer getFamilyByteBuffer() {
return this.cell.getFamilyByteBuffer();
}
@Override
public int getFamilyPosition() {
return this.cell.getFamilyPosition();
}
@Override
public ByteBuffer getQualifierByteBuffer() {
return this.cell.getQualifierByteBuffer();
}
@Override
public int getQualifierPosition() {
return this.cell.getQualifierPosition();
}
@Override
public ByteBuffer getValueByteBuffer() {
return this.cell.getValueByteBuffer();
}
@Override
public int getValuePosition() {
return this.cell.getValuePosition();
}
@Override
public ByteBuffer getTagsByteBuffer() {
return this.tags == null ? HConstants.EMPTY_BYTE_BUFFER : ByteBuffer.wrap(this.tags);
}
@Override
public int getTagsPosition() {
return 0;
}
}
/**
* @param cellScannerables
* @return CellScanner interface over <code>cellIterables</code>