HBASE-12176 WALCellCodec Encoders support for non-KeyValue Cells (Anoop Sam John)
This commit is contained in:
parent
8ee55fb339
commit
e3f9957830
|
@ -30,6 +30,7 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.hbase.Cell;
|
import org.apache.hadoop.hbase.Cell;
|
||||||
import org.apache.hadoop.hbase.KeyValue;
|
import org.apache.hadoop.hbase.KeyValue;
|
||||||
|
import org.apache.hadoop.hbase.KeyValueUtil;
|
||||||
import org.apache.hadoop.hbase.codec.KeyValueCodec;
|
import org.apache.hadoop.hbase.codec.KeyValueCodec;
|
||||||
import org.apache.hadoop.hbase.io.crypto.Decryptor;
|
import org.apache.hadoop.hbase.io.crypto.Decryptor;
|
||||||
import org.apache.hadoop.hbase.io.crypto.Encryption;
|
import org.apache.hadoop.hbase.io.crypto.Encryption;
|
||||||
|
@ -179,16 +180,11 @@ public class SecureWALCellCodec extends WALCellCodec {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void write(Cell cell) throws IOException {
|
public void write(Cell cell) throws IOException {
|
||||||
if (!(cell instanceof KeyValue)) throw new IOException("Cannot write non-KV cells to WAL");
|
|
||||||
if (encryptor == null) {
|
if (encryptor == null) {
|
||||||
super.write(cell);
|
super.write(cell);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
KeyValue kv = (KeyValue)cell;
|
|
||||||
byte[] kvBuffer = kv.getBuffer();
|
|
||||||
int offset = kv.getOffset();
|
|
||||||
|
|
||||||
byte[] iv = nextIv();
|
byte[] iv = nextIv();
|
||||||
encryptor.setIv(iv);
|
encryptor.setIv(iv);
|
||||||
encryptor.reset();
|
encryptor.reset();
|
||||||
|
@ -205,23 +201,27 @@ public class SecureWALCellCodec extends WALCellCodec {
|
||||||
ByteArrayOutputStream baos = new ByteArrayOutputStream();
|
ByteArrayOutputStream baos = new ByteArrayOutputStream();
|
||||||
OutputStream cout = encryptor.createEncryptionStream(baos);
|
OutputStream cout = encryptor.createEncryptionStream(baos);
|
||||||
|
|
||||||
|
int tlen = cell.getTagsLength();
|
||||||
// Write the KeyValue infrastructure as VInts.
|
// Write the KeyValue infrastructure as VInts.
|
||||||
StreamUtils.writeRawVInt32(cout, kv.getKeyLength());
|
StreamUtils.writeRawVInt32(cout, KeyValueUtil.keyLength(cell));
|
||||||
StreamUtils.writeRawVInt32(cout, kv.getValueLength());
|
StreamUtils.writeRawVInt32(cout, cell.getValueLength());
|
||||||
// To support tags
|
// To support tags
|
||||||
StreamUtils.writeRawVInt32(cout, kv.getTagsLength());
|
StreamUtils.writeRawVInt32(cout, tlen);
|
||||||
|
|
||||||
// Write row, qualifier, and family
|
// Write row, qualifier, and family
|
||||||
StreamUtils.writeRawVInt32(cout, kv.getRowLength());
|
StreamUtils.writeRawVInt32(cout, cell.getRowLength());
|
||||||
cout.write(kvBuffer, kv.getRowOffset(), kv.getRowLength());
|
cout.write(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength());
|
||||||
StreamUtils.writeRawVInt32(cout, kv.getFamilyLength());
|
StreamUtils.writeRawVInt32(cout, cell.getFamilyLength());
|
||||||
cout.write(kvBuffer, kv.getFamilyOffset(), kv.getFamilyLength());
|
cout.write(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength());
|
||||||
StreamUtils.writeRawVInt32(cout, kv.getQualifierLength());
|
StreamUtils.writeRawVInt32(cout, cell.getQualifierLength());
|
||||||
cout.write(kvBuffer, kv.getQualifierOffset(), kv.getQualifierLength());
|
cout.write(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength());
|
||||||
// Write the rest
|
// Write the rest ie. ts, type, value and tags parts
|
||||||
int pos = kv.getTimestampOffset();
|
StreamUtils.writeLong(cout, cell.getTimestamp());
|
||||||
int remainingLength = kv.getLength() + offset - pos;
|
cout.write(cell.getTypeByte());
|
||||||
cout.write(kvBuffer, pos, remainingLength);
|
cout.write(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
|
||||||
|
if (tlen > 0) {
|
||||||
|
cout.write(cell.getTagsArray(), cell.getTagsOffset(), tlen);
|
||||||
|
}
|
||||||
cout.close();
|
cout.close();
|
||||||
|
|
||||||
StreamUtils.writeRawVInt32(out, baos.size());
|
StreamUtils.writeRawVInt32(out, baos.size());
|
||||||
|
|
|
@ -27,6 +27,7 @@ import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.hbase.Cell;
|
import org.apache.hadoop.hbase.Cell;
|
||||||
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
|
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
|
||||||
import org.apache.hadoop.hbase.KeyValue;
|
import org.apache.hadoop.hbase.KeyValue;
|
||||||
|
import org.apache.hadoop.hbase.KeyValueUtil;
|
||||||
import org.apache.hadoop.hbase.codec.BaseDecoder;
|
import org.apache.hadoop.hbase.codec.BaseDecoder;
|
||||||
import org.apache.hadoop.hbase.codec.BaseEncoder;
|
import org.apache.hadoop.hbase.codec.BaseEncoder;
|
||||||
import org.apache.hadoop.hbase.codec.Codec;
|
import org.apache.hadoop.hbase.codec.Codec;
|
||||||
|
@ -190,41 +191,34 @@ public class WALCellCodec implements Codec {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void write(Cell cell) throws IOException {
|
public void write(Cell cell) throws IOException {
|
||||||
if (!(cell instanceof KeyValue)) throw new IOException("Cannot write non-KV cells to WAL");
|
|
||||||
KeyValue kv = (KeyValue)cell;
|
|
||||||
byte[] kvBuffer = kv.getBuffer();
|
|
||||||
int offset = kv.getOffset();
|
|
||||||
|
|
||||||
// We first write the KeyValue infrastructure as VInts.
|
// We first write the KeyValue infrastructure as VInts.
|
||||||
StreamUtils.writeRawVInt32(out, kv.getKeyLength());
|
StreamUtils.writeRawVInt32(out, KeyValueUtil.keyLength(cell));
|
||||||
StreamUtils.writeRawVInt32(out, kv.getValueLength());
|
StreamUtils.writeRawVInt32(out, cell.getValueLength());
|
||||||
// To support tags
|
// To support tags
|
||||||
int tagsLength = kv.getTagsLength();
|
int tagsLength = cell.getTagsLength();
|
||||||
StreamUtils.writeRawVInt32(out, tagsLength);
|
StreamUtils.writeRawVInt32(out, tagsLength);
|
||||||
|
|
||||||
// Write row, qualifier, and family; use dictionary
|
// Write row, qualifier, and family; use dictionary
|
||||||
// compression as they're likely to have duplicates.
|
// compression as they're likely to have duplicates.
|
||||||
write(kvBuffer, kv.getRowOffset(), kv.getRowLength(), compression.rowDict);
|
write(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength(), compression.rowDict);
|
||||||
write(kvBuffer, kv.getFamilyOffset(), kv.getFamilyLength(), compression.familyDict);
|
write(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength(),
|
||||||
write(kvBuffer, kv.getQualifierOffset(), kv.getQualifierLength(), compression.qualifierDict);
|
compression.familyDict);
|
||||||
|
write(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength(),
|
||||||
|
compression.qualifierDict);
|
||||||
|
|
||||||
// Write timestamp, type and value as uncompressed.
|
// Write timestamp, type and value as uncompressed.
|
||||||
int pos = kv.getTimestampOffset();
|
StreamUtils.writeLong(out, cell.getTimestamp());
|
||||||
int tsTypeValLen = kv.getLength() + offset - pos;
|
out.write(cell.getTypeByte());
|
||||||
if (tagsLength > 0) {
|
out.write(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
|
||||||
tsTypeValLen = tsTypeValLen - tagsLength - KeyValue.TAGS_LENGTH_SIZE;
|
|
||||||
}
|
|
||||||
assert tsTypeValLen > 0;
|
|
||||||
out.write(kvBuffer, pos, tsTypeValLen);
|
|
||||||
if (tagsLength > 0) {
|
if (tagsLength > 0) {
|
||||||
if (compression.tagCompressionContext != null) {
|
if (compression.tagCompressionContext != null) {
|
||||||
// Write tags using Dictionary compression
|
// Write tags using Dictionary compression
|
||||||
compression.tagCompressionContext.compressTags(out, kvBuffer, kv.getTagsOffset(),
|
compression.tagCompressionContext.compressTags(out, cell.getTagsArray(),
|
||||||
tagsLength);
|
cell.getTagsOffset(), tagsLength);
|
||||||
} else {
|
} else {
|
||||||
// Tag compression is disabled within the WAL compression. Just write the tags bytes as
|
// Tag compression is disabled within the WAL compression. Just write the tags bytes as
|
||||||
// it is.
|
// it is.
|
||||||
out.write(kvBuffer, kv.getTagsOffset(), tagsLength);
|
out.write(cell.getTagsArray(), cell.getTagsOffset(), tagsLength);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -340,10 +334,9 @@ public class WALCellCodec implements Codec {
|
||||||
}
|
}
|
||||||
@Override
|
@Override
|
||||||
public void write(Cell cell) throws IOException {
|
public void write(Cell cell) throws IOException {
|
||||||
if (!(cell instanceof KeyValue)) throw new IOException("Cannot write non-KV cells to WAL");
|
|
||||||
checkFlushed();
|
checkFlushed();
|
||||||
// Make sure to write tags into WAL
|
// Make sure to write tags into WAL
|
||||||
KeyValue.oswrite((KeyValue) cell, this.out, true);
|
KeyValueUtil.oswrite(cell, this.out, true);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -22,7 +22,7 @@ import java.util.Collection;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.hbase.KeyValue;
|
import org.apache.hadoop.hbase.Cell;
|
||||||
import org.apache.hadoop.hbase.TableName;
|
import org.apache.hadoop.hbase.TableName;
|
||||||
import org.apache.hadoop.hbase.security.User;
|
import org.apache.hadoop.hbase.security.User;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
@ -140,9 +140,10 @@ public class AuthResult {
|
||||||
String qualifier;
|
String qualifier;
|
||||||
if (o instanceof byte[]) {
|
if (o instanceof byte[]) {
|
||||||
qualifier = Bytes.toString((byte[])o);
|
qualifier = Bytes.toString((byte[])o);
|
||||||
} else if (o instanceof KeyValue) {
|
} else if (o instanceof Cell) {
|
||||||
byte[] rawQualifier = ((KeyValue)o).getQualifier();
|
Cell c = (Cell) o;
|
||||||
qualifier = Bytes.toString(rawQualifier);
|
qualifier = Bytes.toString(c.getQualifierArray(), c.getQualifierOffset(),
|
||||||
|
c.getQualifierLength());
|
||||||
} else {
|
} else {
|
||||||
// Shouldn't really reach this?
|
// Shouldn't really reach this?
|
||||||
qualifier = o.toString();
|
qualifier = o.toString();
|
||||||
|
|
Loading…
Reference in New Issue