HBASE-20625 refactor some WALCellCodec related code

Signed-off-by: Guanghao Zhang <zghao@apache.org>
This commit is contained in:
jingyuntian 2018-06-12 16:17:13 +08:00 committed by Guanghao Zhang
parent 161dc7c7f3
commit bde9f08a83
13 changed files with 140 additions and 139 deletions

View File

@ -24,29 +24,25 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.UUID;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.PrivateCellUtil;
import org.apache.hadoop.hbase.wal.WALKeyImpl;
import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.io.SizedCellScanner;
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
import org.apache.hadoop.hbase.ipc.HBaseRpcControllerImpl;
import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;
@InterfaceAudience.Private
public class ReplicationProtbufUtil {
/**
@ -81,7 +77,7 @@ public class ReplicationProtbufUtil {
* found.
*/
public static Pair<AdminProtos.ReplicateWALEntryRequest, CellScanner>
buildReplicateWALEntryRequest(final Entry[] entries) {
buildReplicateWALEntryRequest(final Entry[] entries) throws IOException {
return buildReplicateWALEntryRequest(entries, null, null, null, null);
}
@ -97,53 +93,30 @@ public class ReplicationProtbufUtil {
*/
public static Pair<AdminProtos.ReplicateWALEntryRequest, CellScanner>
buildReplicateWALEntryRequest(final Entry[] entries, byte[] encodedRegionName,
String replicationClusterId, Path sourceBaseNamespaceDir, Path sourceHFileArchiveDir) {
String replicationClusterId, Path sourceBaseNamespaceDir, Path sourceHFileArchiveDir)
throws IOException {
// Accumulate all the Cells seen in here.
List<List<? extends Cell>> allCells = new ArrayList<>(entries.length);
int size = 0;
WALProtos.FamilyScope.Builder scopeBuilder = WALProtos.FamilyScope.newBuilder();
AdminProtos.WALEntry.Builder entryBuilder = AdminProtos.WALEntry.newBuilder();
AdminProtos.ReplicateWALEntryRequest.Builder builder =
AdminProtos.ReplicateWALEntryRequest.newBuilder();
HBaseProtos.UUID.Builder uuidBuilder = HBaseProtos.UUID.newBuilder();
for (Entry entry: entries) {
entryBuilder.clear();
// TODO: this duplicates a lot in WALKeyImpl#getBuilder
WALProtos.WALKey.Builder keyBuilder = entryBuilder.getKeyBuilder();
WALKeyImpl key = entry.getKey();
keyBuilder.setEncodedRegionName(
UnsafeByteOperations.unsafeWrap(encodedRegionName == null
? key.getEncodedRegionName()
: encodedRegionName));
keyBuilder.setTableName(UnsafeByteOperations.unsafeWrap(key.getTableName().getName()));
long sequenceId = key.getSequenceId();
keyBuilder.setLogSequenceNumber(sequenceId);
keyBuilder.setWriteTime(key.getWriteTime());
if (key.getNonce() != HConstants.NO_NONCE) {
keyBuilder.setNonce(key.getNonce());
WALProtos.WALKey.Builder keyBuilder;
try {
keyBuilder = entry.getKey().getBuilder(WALCellCodec.getNoneCompressor());
} catch (IOException e) {
throw new IOException(
"There should not throw exception since NoneCompressor do not throw any exceptions", e);
}
if (key.getNonceGroup() != HConstants.NO_NONCE) {
keyBuilder.setNonceGroup(key.getNonceGroup());
}
for(UUID clusterId : key.getClusterIds()) {
uuidBuilder.setLeastSigBits(clusterId.getLeastSignificantBits());
uuidBuilder.setMostSigBits(clusterId.getMostSignificantBits());
keyBuilder.addClusterIds(uuidBuilder.build());
}
if (key.getOrigLogSeqNum() > 0) {
keyBuilder.setOrigSequenceNumber(key.getOrigLogSeqNum());
if(encodedRegionName != null){
keyBuilder.setEncodedRegionName(
UnsafeByteOperations.unsafeWrap(encodedRegionName));
}
entryBuilder.setKey(keyBuilder.build());
WALEdit edit = entry.getEdit();
NavigableMap<byte[], Integer> scopes = key.getReplicationScopes();
if (scopes != null && !scopes.isEmpty()) {
for (Map.Entry<byte[], Integer> scope: scopes.entrySet()) {
scopeBuilder.setFamily(UnsafeByteOperations.unsafeWrap(scope.getKey()));
WALProtos.ScopeType scopeType =
WALProtos.ScopeType.valueOf(scope.getValue().intValue());
scopeBuilder.setScopeType(scopeType);
keyBuilder.addScopes(scopeBuilder.build());
}
}
List<Cell> cells = edit.getCells();
// Add up the size. It is used later serializing out the kvs.
for (Cell cell: cells) {

View File

@ -183,6 +183,8 @@ public abstract class AbstractProtobufLogWriter {
this.cellEncoder = codec.getEncoder(getOutputStreamForCellEncoder());
if (doCompress) {
this.compressor = codec.getByteStringCompressor();
} else {
this.compressor = WALCellCodec.getNoneCompressor();
}
}
@ -198,6 +200,7 @@ public abstract class AbstractProtobufLogWriter {
this.cellEncoder = codec.getEncoder(getOutputStreamForCellEncoder());
// We do not support compression
this.compressionContext = null;
this.compressor = WALCellCodec.getNoneCompressor();
} else {
initAfterHeader0(doCompress);
}

View File

@ -120,7 +120,6 @@ public class AsyncProtobufLogWriter extends AbstractProtobufLogWriter
@Override
public void append(Entry entry) {
int buffered = output.buffered();
entry.setCompressionContext(compressionContext);
try {
entry.getKey().
getBuilder(compressor).setFollowingKvCount(entry.getEdit().size()).build()

View File

@ -20,6 +20,8 @@ package org.apache.hadoop.hbase.regionserver.wal;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.util.EnumMap;
import java.util.Map;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.yetus.audience.InterfaceAudience;
@ -35,12 +37,12 @@ public class CompressionContext {
static final String ENABLE_WAL_TAGS_COMPRESSION =
"hbase.regionserver.wal.tags.enablecompression";
// visible only for WALKey, until we move everything into o.a.h.h.wal
public final Dictionary regionDict;
public final Dictionary tableDict;
public final Dictionary familyDict;
final Dictionary qualifierDict;
final Dictionary rowDict;
public enum DictionaryIndex {
REGION, TABLE, FAMILY, QUALIFIER, ROW
}
private final Map<DictionaryIndex, Dictionary> dictionaries =
new EnumMap<>(DictionaryIndex.class);
// Context used for compressing tags
TagCompressionContext tagCompressionContext = null;
@ -49,33 +51,35 @@ public class CompressionContext {
InstantiationException, IllegalAccessException, InvocationTargetException {
Constructor<? extends Dictionary> dictConstructor =
dictType.getConstructor();
regionDict = dictConstructor.newInstance();
tableDict = dictConstructor.newInstance();
familyDict = dictConstructor.newInstance();
qualifierDict = dictConstructor.newInstance();
rowDict = dictConstructor.newInstance();
if (recoveredEdits) {
// This will never change
regionDict.init(1);
tableDict.init(1);
} else {
regionDict.init(Short.MAX_VALUE);
tableDict.init(Short.MAX_VALUE);
for (DictionaryIndex dictionaryIndex : DictionaryIndex.values()) {
Dictionary newDictionary = dictConstructor.newInstance();
dictionaries.put(dictionaryIndex, newDictionary);
}
rowDict.init(Short.MAX_VALUE);
familyDict.init(Byte.MAX_VALUE);
qualifierDict.init(Byte.MAX_VALUE);
if(recoveredEdits) {
getDictionary(DictionaryIndex.REGION).init(1);
getDictionary(DictionaryIndex.TABLE).init(1);
} else {
getDictionary(DictionaryIndex.REGION).init(Short.MAX_VALUE);
getDictionary(DictionaryIndex.TABLE).init(Short.MAX_VALUE);
}
getDictionary(DictionaryIndex.ROW).init(Short.MAX_VALUE);
getDictionary(DictionaryIndex.FAMILY).init(Byte.MAX_VALUE);
getDictionary(DictionaryIndex.QUALIFIER).init(Byte.MAX_VALUE);
if (hasTagCompression) {
tagCompressionContext = new TagCompressionContext(dictType, Short.MAX_VALUE);
}
}
public Dictionary getDictionary(Enum dictIndex) {
return dictionaries.get(dictIndex);
}
void clear() {
regionDict.clear();
tableDict.clear();
familyDict.clear();
qualifierDict.clear();
rowDict.clear();
for(Dictionary dictionary : dictionaries.values()){
dictionary.clear();
}
if (tagCompressionContext != null) {
tagCompressionContext.clear();
}

View File

@ -312,6 +312,8 @@ public class ProtobufLogReader extends ReaderBase {
this.cellDecoder = codec.getDecoder(this.inputStream);
if (this.hasCompression) {
this.byteStringUncompressor = codec.getByteStringUncompressor();
} else {
this.byteStringUncompressor = WALCellCodec.getNoneUncompressor();
}
}

View File

@ -47,7 +47,6 @@ public class ProtobufLogWriter extends AbstractProtobufLogWriter
@Override
public void append(Entry entry) throws IOException {
entry.setCompressionContext(compressionContext);
entry.getKey().getBuilder(compressor).
setFollowingKvCount(entry.getEdit().size()).build().writeDelimitedTo(output);
for (Cell cell : entry.getEdit().getCells()) {

View File

@ -92,9 +92,6 @@ public abstract class ReaderBase implements AbstractFSWALProvider.Reader {
if (e == null) {
e = new Entry();
}
if (compressionContext != null) {
e.setCompressionContext(compressionContext);
}
boolean hasEntry = false;
try {

View File

@ -141,6 +141,7 @@ public class SecureProtobufLogReader extends ProtobufLogReader {
this.cellDecoder = codec.getDecoder(this.inputStream);
// We do not support compression with WAL encryption
this.compressionContext = null;
this.byteStringUncompressor = WALCellCodec.getNoneUncompressor();
this.hasCompression = false;
} else {
super.initAfterCompression(cellCodecClsName);

View File

@ -46,6 +46,7 @@ import org.apache.hadoop.hbase.util.ReflectionUtils;
import org.apache.hadoop.io.IOUtils;
import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;
import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;
/**
@ -62,12 +63,6 @@ public class WALCellCodec implements Codec {
public static final String WAL_CELL_CODEC_CLASS_KEY = "hbase.regionserver.wal.codec";
protected final CompressionContext compression;
protected final ByteStringUncompressor statelessUncompressor = new ByteStringUncompressor() {
@Override
public byte[] uncompress(ByteString data, Dictionary dict) throws IOException {
return WALCellCodec.uncompressByteString(data, dict);
}
};
/**
* <b>All subclasses must implement a no argument constructor</b>
@ -132,17 +127,32 @@ public class WALCellCodec implements Codec {
}
public interface ByteStringCompressor {
ByteString compress(byte[] data, Dictionary dict) throws IOException;
ByteString compress(byte[] data, Enum dictIndex) throws IOException;
}
public interface ByteStringUncompressor {
byte[] uncompress(ByteString data, Dictionary dict) throws IOException;
byte[] uncompress(ByteString data, Enum dictIndex) throws IOException;
}
static class StatelessUncompressor implements ByteStringUncompressor {
CompressionContext compressionContext;
public StatelessUncompressor(CompressionContext compressionContext) {
this.compressionContext = compressionContext;
}
@Override
public byte[] uncompress(ByteString data, Enum dictIndex) throws IOException {
return WALCellCodec.uncompressByteString(data, compressionContext.getDictionary(dictIndex));
}
}
// TODO: it sucks that compression context is in WAL.Entry. It'd be nice if it was here.
// Dictionary could be gotten by enum; initially, based on enum, context would create
// an array of dictionaries.
static class BaosAndCompressor extends ByteArrayOutputStream implements ByteStringCompressor {
private CompressionContext compressionContext;
public BaosAndCompressor(CompressionContext compressionContext) {
this.compressionContext = compressionContext;
}
public ByteString toByteString() {
// We need this copy to create the ByteString as the byte[] 'buf' is not immutable. We reuse
// them.
@ -150,8 +160,8 @@ public class WALCellCodec implements Codec {
}
@Override
public ByteString compress(byte[] data, Dictionary dict) throws IOException {
writeCompressed(data, dict);
public ByteString compress(byte[] data, Enum dictIndex) throws IOException {
writeCompressed(data, dictIndex);
// We need this copy to create the ByteString as the byte[] 'buf' is not immutable. We reuse
// them.
ByteString result = ByteString.copyFrom(this.buf, 0, this.count);
@ -159,7 +169,8 @@ public class WALCellCodec implements Codec {
return result;
}
private void writeCompressed(byte[] data, Dictionary dict) throws IOException {
private void writeCompressed(byte[] data, Enum dictIndex) throws IOException {
Dictionary dict = compressionContext.getDictionary(dictIndex);
assert dict != null;
short dictIdx = dict.findEntry(data, 0, data.length);
if (dictIdx == Dictionary.NOT_IN_DICTIONARY) {
@ -172,6 +183,22 @@ public class WALCellCodec implements Codec {
}
}
static class NoneCompressor implements ByteStringCompressor {
@Override
public ByteString compress(byte[] data, Enum dictIndex) {
return UnsafeByteOperations.unsafeWrap(data);
}
}
static class NoneUncompressor implements ByteStringUncompressor {
@Override
public byte[] uncompress(ByteString data, Enum dictIndex) {
return data.toByteArray();
}
}
private static byte[] uncompressByteString(ByteString bs, Dictionary dict) throws IOException {
InputStream in = bs.newInput();
byte status = (byte)in.read();
@ -209,9 +236,12 @@ public class WALCellCodec implements Codec {
// To support tags
int tagsLength = cell.getTagsLength();
StreamUtils.writeRawVInt32(out, tagsLength);
PrivateCellUtil.compressRow(out, cell, compression.rowDict);
PrivateCellUtil.compressFamily(out, cell, compression.familyDict);
PrivateCellUtil.compressQualifier(out, cell, compression.qualifierDict);
PrivateCellUtil.compressRow(out, cell,
compression.getDictionary(CompressionContext.DictionaryIndex.ROW));
PrivateCellUtil.compressFamily(out, cell,
compression.getDictionary(CompressionContext.DictionaryIndex.FAMILY));
PrivateCellUtil.compressQualifier(out, cell,
compression.getDictionary(CompressionContext.DictionaryIndex.QUALIFIER));
// Write timestamp, type and value as uncompressed.
StreamUtils.writeLong(out, cell.getTimestamp());
out.write(cell.getTypeByte());
@ -255,19 +285,22 @@ public class WALCellCodec implements Codec {
pos = Bytes.putInt(backingArray, pos, vlength);
// the row
int elemLen = readIntoArray(backingArray, pos + Bytes.SIZEOF_SHORT, compression.rowDict);
int elemLen = readIntoArray(backingArray, pos + Bytes.SIZEOF_SHORT,
compression.getDictionary(CompressionContext.DictionaryIndex.ROW));
checkLength(elemLen, Short.MAX_VALUE);
pos = Bytes.putShort(backingArray, pos, (short)elemLen);
pos += elemLen;
// family
elemLen = readIntoArray(backingArray, pos + Bytes.SIZEOF_BYTE, compression.familyDict);
elemLen = readIntoArray(backingArray, pos + Bytes.SIZEOF_BYTE,
compression.getDictionary(CompressionContext.DictionaryIndex.FAMILY));
checkLength(elemLen, Byte.MAX_VALUE);
pos = Bytes.putByte(backingArray, pos, (byte)elemLen);
pos += elemLen;
// qualifier
elemLen = readIntoArray(backingArray, pos, compression.qualifierDict);
elemLen = readIntoArray(backingArray, pos,
compression.getDictionary(CompressionContext.DictionaryIndex.QUALIFIER));
pos += elemLen;
// timestamp, type and value
@ -354,12 +387,18 @@ public class WALCellCodec implements Codec {
}
public ByteStringCompressor getByteStringCompressor() {
// TODO: ideally this should also encapsulate compressionContext
return new BaosAndCompressor();
return new BaosAndCompressor(compression);
}
public ByteStringUncompressor getByteStringUncompressor() {
// TODO: ideally this should also encapsulate compressionContext
return this.statelessUncompressor;
return new StatelessUncompressor(compression);
}
public static ByteStringCompressor getNoneCompressor() {
return new NoneCompressor();
}
public static ByteStringUncompressor getNoneUncompressor() {
return new NoneUncompressor();
}
}

View File

@ -60,8 +60,6 @@ public class ClusterMarkingEntryFilter implements WALEntryFilter {
if (edit != null && !edit.isEmpty()) {
// Mark that the current cluster has the change
logKey.addClusterId(clusterId);
// We need to set the CC to null else it will be compressed when sent to the sink
entry.setCompressionContext(null);
return entry;
}
}

View File

@ -283,7 +283,9 @@ public interface WAL extends Closeable, WALFileLengthProvider {
*
* @param compressionContext
* Compression context
* @deprecated deparcated since hbase 2.1.0
*/
@Deprecated
public void setCompressionContext(CompressionContext compressionContext) {
key.setCompressionContext(compressionContext);
}

View File

@ -37,7 +37,6 @@ import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;
import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
@ -117,8 +116,6 @@ public class WALKeyImpl implements WALKey {
*/
private MultiVersionConcurrencyControl.WriteEntry writeEntry;
private CompressionContext compressionContext;
public WALKeyImpl() {
init(null, null, 0L, HConstants.LATEST_TIMESTAMP,
new ArrayList<>(), HConstants.NO_NONCE, HConstants.NO_NONCE, null, null);
@ -334,9 +331,11 @@ public class WALKeyImpl implements WALKey {
/**
* @param compressionContext Compression context to use
* @deprecated deparcated since hbase 2.1.0
*/
@Deprecated
public void setCompressionContext(CompressionContext compressionContext) {
this.compressionContext = compressionContext;
//do nothing
}
/** @return encoded region name */
@ -517,18 +516,13 @@ public class WALKeyImpl implements WALKey {
this.encodedRegionName = encodedRegionName;
}
public WALProtos.WALKey.Builder getBuilder(
WALCellCodec.ByteStringCompressor compressor) throws IOException {
public WALProtos.WALKey.Builder getBuilder(WALCellCodec.ByteStringCompressor compressor)
throws IOException {
WALProtos.WALKey.Builder builder = WALProtos.WALKey.newBuilder();
if (compressionContext == null) {
builder.setEncodedRegionName(UnsafeByteOperations.unsafeWrap(this.encodedRegionName));
builder.setTableName(UnsafeByteOperations.unsafeWrap(this.tablename.getName()));
} else {
builder.setEncodedRegionName(compressor.compress(this.encodedRegionName,
compressionContext.regionDict));
builder.setTableName(compressor.compress(this.tablename.getName(),
compressionContext.tableDict));
}
builder.setEncodedRegionName(
compressor.compress(this.encodedRegionName, CompressionContext.DictionaryIndex.REGION));
builder.setTableName(
compressor.compress(this.tablename.getName(), CompressionContext.DictionaryIndex.TABLE));
builder.setLogSequenceNumber(getSequenceId());
builder.setWriteTime(writeTime);
if (this.origLogSeqNum > 0) {
@ -548,29 +542,22 @@ public class WALKeyImpl implements WALKey {
}
if (replicationScope != null) {
for (Map.Entry<byte[], Integer> e : replicationScope.entrySet()) {
ByteString family = (compressionContext == null)
? UnsafeByteOperations.unsafeWrap(e.getKey())
: compressor.compress(e.getKey(), compressionContext.familyDict);
builder.addScopes(FamilyScope.newBuilder()
.setFamily(family).setScopeType(ScopeType.forNumber(e.getValue())));
ByteString family =
compressor.compress(e.getKey(), CompressionContext.DictionaryIndex.FAMILY);
builder.addScopes(FamilyScope.newBuilder().setFamily(family)
.setScopeType(ScopeType.forNumber(e.getValue())));
}
}
return builder;
}
public void readFieldsFromPb(WALProtos.WALKey walKey,
WALCellCodec.ByteStringUncompressor uncompressor)
throws IOException {
if (this.compressionContext != null) {
this.encodedRegionName = uncompressor.uncompress(
walKey.getEncodedRegionName(), compressionContext.regionDict);
byte[] tablenameBytes = uncompressor.uncompress(
walKey.getTableName(), compressionContext.tableDict);
this.tablename = TableName.valueOf(tablenameBytes);
} else {
this.encodedRegionName = walKey.getEncodedRegionName().toByteArray();
this.tablename = TableName.valueOf(walKey.getTableName().toByteArray());
}
WALCellCodec.ByteStringUncompressor uncompressor) throws IOException {
this.encodedRegionName = uncompressor.uncompress(walKey.getEncodedRegionName(),
CompressionContext.DictionaryIndex.REGION);
byte[] tablenameBytes =
uncompressor.uncompress(walKey.getTableName(), CompressionContext.DictionaryIndex.TABLE);
this.tablename = TableName.valueOf(tablenameBytes);
clusterIds.clear();
for (HBaseProtos.UUID clusterId : walKey.getClusterIdsList()) {
clusterIds.add(new UUID(clusterId.getMostSigBits(), clusterId.getLeastSigBits()));
@ -585,14 +572,14 @@ public class WALKeyImpl implements WALKey {
if (walKey.getScopesCount() > 0) {
this.replicationScope = new TreeMap<>(Bytes.BYTES_COMPARATOR);
for (FamilyScope scope : walKey.getScopesList()) {
byte[] family = (compressionContext == null) ? scope.getFamily().toByteArray() :
uncompressor.uncompress(scope.getFamily(), compressionContext.familyDict);
byte[] family =
uncompressor.uncompress(scope.getFamily(), CompressionContext.DictionaryIndex.FAMILY);
this.replicationScope.put(family, scope.getScopeType().getNumber());
}
}
setSequenceId(walKey.getLogSequenceNumber());
this.writeTime = walKey.getWriteTime();
if(walKey.hasOrigSequenceNumber()) {
if (walKey.hasOrigSequenceNumber()) {
this.origLogSeqNum = walKey.getOrigSequenceNumber();
}
}

View File

@ -45,9 +45,6 @@ public class FaultyProtobufLogReader extends ProtobufLogReader {
boolean b;
do {
Entry e = new Entry();
if (compressionContext != null) {
e.setCompressionContext(compressionContext);
}
b = readNext(e);
nextQueue.offer(e);
numberOfFileEntries++;