HBASE-12176 WALCellCodec Encoders support for non-KeyValue Cells (Anoop Sam John)
This commit is contained in:
parent
6a37c169fc
commit
8938dc9630
|
@ -30,6 +30,7 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
|||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.Cell;
|
||||
import org.apache.hadoop.hbase.KeyValue;
|
||||
import org.apache.hadoop.hbase.KeyValueUtil;
|
||||
import org.apache.hadoop.hbase.codec.KeyValueCodec;
|
||||
import org.apache.hadoop.hbase.io.crypto.Decryptor;
|
||||
import org.apache.hadoop.hbase.io.crypto.Encryption;
|
||||
|
@ -179,16 +180,11 @@ public class SecureWALCellCodec extends WALCellCodec {
|
|||
|
||||
@Override
|
||||
public void write(Cell cell) throws IOException {
|
||||
if (!(cell instanceof KeyValue)) throw new IOException("Cannot write non-KV cells to WAL");
|
||||
if (encryptor == null) {
|
||||
super.write(cell);
|
||||
return;
|
||||
}
|
||||
|
||||
KeyValue kv = (KeyValue)cell;
|
||||
byte[] kvBuffer = kv.getBuffer();
|
||||
int offset = kv.getOffset();
|
||||
|
||||
byte[] iv = nextIv();
|
||||
encryptor.setIv(iv);
|
||||
encryptor.reset();
|
||||
|
@ -205,23 +201,27 @@ public class SecureWALCellCodec extends WALCellCodec {
|
|||
ByteArrayOutputStream baos = new ByteArrayOutputStream();
|
||||
OutputStream cout = encryptor.createEncryptionStream(baos);
|
||||
|
||||
int tlen = cell.getTagsLength();
|
||||
// Write the KeyValue infrastructure as VInts.
|
||||
StreamUtils.writeRawVInt32(cout, kv.getKeyLength());
|
||||
StreamUtils.writeRawVInt32(cout, kv.getValueLength());
|
||||
StreamUtils.writeRawVInt32(cout, KeyValueUtil.keyLength(cell));
|
||||
StreamUtils.writeRawVInt32(cout, cell.getValueLength());
|
||||
// To support tags
|
||||
StreamUtils.writeRawVInt32(cout, kv.getTagsLength());
|
||||
StreamUtils.writeRawVInt32(cout, tlen);
|
||||
|
||||
// Write row, qualifier, and family
|
||||
StreamUtils.writeRawVInt32(cout, kv.getRowLength());
|
||||
cout.write(kvBuffer, kv.getRowOffset(), kv.getRowLength());
|
||||
StreamUtils.writeRawVInt32(cout, kv.getFamilyLength());
|
||||
cout.write(kvBuffer, kv.getFamilyOffset(), kv.getFamilyLength());
|
||||
StreamUtils.writeRawVInt32(cout, kv.getQualifierLength());
|
||||
cout.write(kvBuffer, kv.getQualifierOffset(), kv.getQualifierLength());
|
||||
// Write the rest
|
||||
int pos = kv.getTimestampOffset();
|
||||
int remainingLength = kv.getLength() + offset - pos;
|
||||
cout.write(kvBuffer, pos, remainingLength);
|
||||
StreamUtils.writeRawVInt32(cout, cell.getRowLength());
|
||||
cout.write(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength());
|
||||
StreamUtils.writeRawVInt32(cout, cell.getFamilyLength());
|
||||
cout.write(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength());
|
||||
StreamUtils.writeRawVInt32(cout, cell.getQualifierLength());
|
||||
cout.write(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength());
|
||||
// Write the rest ie. ts, type, value and tags parts
|
||||
StreamUtils.writeLong(cout, cell.getTimestamp());
|
||||
cout.write(cell.getTypeByte());
|
||||
cout.write(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
|
||||
if (tlen > 0) {
|
||||
cout.write(cell.getTagsArray(), cell.getTagsOffset(), tlen);
|
||||
}
|
||||
cout.close();
|
||||
|
||||
StreamUtils.writeRawVInt32(out, baos.size());
|
||||
|
|
|
@ -27,6 +27,7 @@ import org.apache.hadoop.conf.Configuration;
|
|||
import org.apache.hadoop.hbase.Cell;
|
||||
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
|
||||
import org.apache.hadoop.hbase.KeyValue;
|
||||
import org.apache.hadoop.hbase.KeyValueUtil;
|
||||
import org.apache.hadoop.hbase.codec.BaseDecoder;
|
||||
import org.apache.hadoop.hbase.codec.BaseEncoder;
|
||||
import org.apache.hadoop.hbase.codec.Codec;
|
||||
|
@ -189,41 +190,34 @@ public class WALCellCodec implements Codec {
|
|||
|
||||
@Override
|
||||
public void write(Cell cell) throws IOException {
|
||||
if (!(cell instanceof KeyValue)) throw new IOException("Cannot write non-KV cells to WAL");
|
||||
KeyValue kv = (KeyValue)cell;
|
||||
byte[] kvBuffer = kv.getBuffer();
|
||||
int offset = kv.getOffset();
|
||||
|
||||
// We first write the KeyValue infrastructure as VInts.
|
||||
StreamUtils.writeRawVInt32(out, kv.getKeyLength());
|
||||
StreamUtils.writeRawVInt32(out, kv.getValueLength());
|
||||
StreamUtils.writeRawVInt32(out, KeyValueUtil.keyLength(cell));
|
||||
StreamUtils.writeRawVInt32(out, cell.getValueLength());
|
||||
// To support tags
|
||||
int tagsLength = kv.getTagsLength();
|
||||
int tagsLength = cell.getTagsLength();
|
||||
StreamUtils.writeRawVInt32(out, tagsLength);
|
||||
|
||||
// Write row, qualifier, and family; use dictionary
|
||||
// compression as they're likely to have duplicates.
|
||||
write(kvBuffer, kv.getRowOffset(), kv.getRowLength(), compression.rowDict);
|
||||
write(kvBuffer, kv.getFamilyOffset(), kv.getFamilyLength(), compression.familyDict);
|
||||
write(kvBuffer, kv.getQualifierOffset(), kv.getQualifierLength(), compression.qualifierDict);
|
||||
write(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength(), compression.rowDict);
|
||||
write(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength(),
|
||||
compression.familyDict);
|
||||
write(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength(),
|
||||
compression.qualifierDict);
|
||||
|
||||
// Write timestamp, type and value as uncompressed.
|
||||
int pos = kv.getTimestampOffset();
|
||||
int tsTypeValLen = kv.getLength() + offset - pos;
|
||||
if (tagsLength > 0) {
|
||||
tsTypeValLen = tsTypeValLen - tagsLength - KeyValue.TAGS_LENGTH_SIZE;
|
||||
}
|
||||
assert tsTypeValLen > 0;
|
||||
out.write(kvBuffer, pos, tsTypeValLen);
|
||||
StreamUtils.writeLong(out, cell.getTimestamp());
|
||||
out.write(cell.getTypeByte());
|
||||
out.write(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
|
||||
if (tagsLength > 0) {
|
||||
if (compression.tagCompressionContext != null) {
|
||||
// Write tags using Dictionary compression
|
||||
compression.tagCompressionContext.compressTags(out, kvBuffer, kv.getTagsOffset(),
|
||||
tagsLength);
|
||||
compression.tagCompressionContext.compressTags(out, cell.getTagsArray(),
|
||||
cell.getTagsOffset(), tagsLength);
|
||||
} else {
|
||||
// Tag compression is disabled within the WAL compression. Just write the tags bytes as
|
||||
// it is.
|
||||
out.write(kvBuffer, kv.getTagsOffset(), tagsLength);
|
||||
out.write(cell.getTagsArray(), cell.getTagsOffset(), tagsLength);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -339,10 +333,9 @@ public class WALCellCodec implements Codec {
|
|||
}
|
||||
@Override
|
||||
public void write(Cell cell) throws IOException {
|
||||
if (!(cell instanceof KeyValue)) throw new IOException("Cannot write non-KV cells to WAL");
|
||||
checkFlushed();
|
||||
// Make sure to write tags into WAL
|
||||
KeyValue.oswrite((KeyValue) cell, this.out, true);
|
||||
KeyValueUtil.oswrite(cell, this.out, true);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -22,7 +22,7 @@ import java.util.Collection;
|
|||
import java.util.Map;
|
||||
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.KeyValue;
|
||||
import org.apache.hadoop.hbase.Cell;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.security.User;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
|
@ -140,9 +140,10 @@ public class AuthResult {
|
|||
String qualifier;
|
||||
if (o instanceof byte[]) {
|
||||
qualifier = Bytes.toString((byte[])o);
|
||||
} else if (o instanceof KeyValue) {
|
||||
byte[] rawQualifier = ((KeyValue)o).getQualifier();
|
||||
qualifier = Bytes.toString(rawQualifier);
|
||||
} else if (o instanceof Cell) {
|
||||
Cell c = (Cell) o;
|
||||
qualifier = Bytes.toString(c.getQualifierArray(), c.getQualifierOffset(),
|
||||
c.getQualifierLength());
|
||||
} else {
|
||||
// Shouldn't really reach this?
|
||||
qualifier = o.toString();
|
||||
|
|
Loading…
Reference in New Issue