diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java index 81dd59e6478..157ad1b6ed7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java @@ -24,29 +24,25 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Iterator; import java.util.List; -import java.util.Map; -import java.util.NavigableMap; -import java.util.UUID; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellScanner; -import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.PrivateCellUtil; -import org.apache.hadoop.hbase.wal.WALKeyImpl; +import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec; import org.apache.yetus.audience.InterfaceAudience; import org.apache.hadoop.hbase.io.SizedCellScanner; import org.apache.hadoop.hbase.ipc.HBaseRpcController; import org.apache.hadoop.hbase.ipc.HBaseRpcControllerImpl; import org.apache.hadoop.hbase.wal.WALEdit; -import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService; -import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.wal.WAL.Entry; +import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations; + @InterfaceAudience.Private public class ReplicationProtbufUtil { /** @@ -81,7 +77,7 @@ public class ReplicationProtbufUtil { * found. */ public static Pair - buildReplicateWALEntryRequest(final Entry[] entries) { + buildReplicateWALEntryRequest(final Entry[] entries) throws IOException { return buildReplicateWALEntryRequest(entries, null, null, null, null); } @@ -97,53 +93,30 @@ public class ReplicationProtbufUtil { */ public static Pair buildReplicateWALEntryRequest(final Entry[] entries, byte[] encodedRegionName, - String replicationClusterId, Path sourceBaseNamespaceDir, Path sourceHFileArchiveDir) { + String replicationClusterId, Path sourceBaseNamespaceDir, Path sourceHFileArchiveDir) + throws IOException { // Accumulate all the Cells seen in here. List> allCells = new ArrayList<>(entries.length); int size = 0; - WALProtos.FamilyScope.Builder scopeBuilder = WALProtos.FamilyScope.newBuilder(); AdminProtos.WALEntry.Builder entryBuilder = AdminProtos.WALEntry.newBuilder(); AdminProtos.ReplicateWALEntryRequest.Builder builder = AdminProtos.ReplicateWALEntryRequest.newBuilder(); - HBaseProtos.UUID.Builder uuidBuilder = HBaseProtos.UUID.newBuilder(); + for (Entry entry: entries) { entryBuilder.clear(); - // TODO: this duplicates a lot in WALKeyImpl#getBuilder - WALProtos.WALKey.Builder keyBuilder = entryBuilder.getKeyBuilder(); - WALKeyImpl key = entry.getKey(); - keyBuilder.setEncodedRegionName( - UnsafeByteOperations.unsafeWrap(encodedRegionName == null - ? key.getEncodedRegionName() - : encodedRegionName)); - keyBuilder.setTableName(UnsafeByteOperations.unsafeWrap(key.getTableName().getName())); - long sequenceId = key.getSequenceId(); - keyBuilder.setLogSequenceNumber(sequenceId); - keyBuilder.setWriteTime(key.getWriteTime()); - if (key.getNonce() != HConstants.NO_NONCE) { - keyBuilder.setNonce(key.getNonce()); + WALProtos.WALKey.Builder keyBuilder; + try { + keyBuilder = entry.getKey().getBuilder(WALCellCodec.getNoneCompressor()); + } catch (IOException e) { + throw new IOException( + "There should not throw exception since NoneCompressor do not throw any exceptions", e); } - if (key.getNonceGroup() != HConstants.NO_NONCE) { - keyBuilder.setNonceGroup(key.getNonceGroup()); - } - for(UUID clusterId : key.getClusterIds()) { - uuidBuilder.setLeastSigBits(clusterId.getLeastSignificantBits()); - uuidBuilder.setMostSigBits(clusterId.getMostSignificantBits()); - keyBuilder.addClusterIds(uuidBuilder.build()); - } - if (key.getOrigLogSeqNum() > 0) { - keyBuilder.setOrigSequenceNumber(key.getOrigLogSeqNum()); + if(encodedRegionName != null){ + keyBuilder.setEncodedRegionName( + UnsafeByteOperations.unsafeWrap(encodedRegionName)); } + entryBuilder.setKey(keyBuilder.build()); WALEdit edit = entry.getEdit(); - NavigableMap scopes = key.getReplicationScopes(); - if (scopes != null && !scopes.isEmpty()) { - for (Map.Entry scope: scopes.entrySet()) { - scopeBuilder.setFamily(UnsafeByteOperations.unsafeWrap(scope.getKey())); - WALProtos.ScopeType scopeType = - WALProtos.ScopeType.valueOf(scope.getValue().intValue()); - scopeBuilder.setScopeType(scopeType); - keyBuilder.addScopes(scopeBuilder.build()); - } - } List cells = edit.getCells(); // Add up the size. It is used later serializing out the kvs. for (Cell cell: cells) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractProtobufLogWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractProtobufLogWriter.java index 50ac1019848..ae084a438e9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractProtobufLogWriter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractProtobufLogWriter.java @@ -183,6 +183,8 @@ public abstract class AbstractProtobufLogWriter { this.cellEncoder = codec.getEncoder(getOutputStreamForCellEncoder()); if (doCompress) { this.compressor = codec.getByteStringCompressor(); + } else { + this.compressor = WALCellCodec.getNoneCompressor(); } } @@ -198,6 +200,7 @@ public abstract class AbstractProtobufLogWriter { this.cellEncoder = codec.getEncoder(getOutputStreamForCellEncoder()); // We do not support compression this.compressionContext = null; + this.compressor = WALCellCodec.getNoneCompressor(); } else { initAfterHeader0(doCompress); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java index abdc24e9c6e..6368fb745f2 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java @@ -120,7 +120,6 @@ public class AsyncProtobufLogWriter extends AbstractProtobufLogWriter @Override public void append(Entry entry) { int buffered = output.buffered(); - entry.setCompressionContext(compressionContext); try { entry.getKey(). getBuilder(compressor).setFollowingKvCount(entry.getEdit().size()).build() diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/CompressionContext.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/CompressionContext.java index 22ac4995817..16866e18a7c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/CompressionContext.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/CompressionContext.java @@ -20,6 +20,8 @@ package org.apache.hadoop.hbase.regionserver.wal; import java.lang.reflect.Constructor; import java.lang.reflect.InvocationTargetException; +import java.util.EnumMap; +import java.util.Map; import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.yetus.audience.InterfaceAudience; @@ -35,12 +37,12 @@ public class CompressionContext { static final String ENABLE_WAL_TAGS_COMPRESSION = "hbase.regionserver.wal.tags.enablecompression"; - // visible only for WALKey, until we move everything into o.a.h.h.wal - public final Dictionary regionDict; - public final Dictionary tableDict; - public final Dictionary familyDict; - final Dictionary qualifierDict; - final Dictionary rowDict; + public enum DictionaryIndex { + REGION, TABLE, FAMILY, QUALIFIER, ROW + } + + private final Map dictionaries = + new EnumMap<>(DictionaryIndex.class); // Context used for compressing tags TagCompressionContext tagCompressionContext = null; @@ -49,33 +51,35 @@ public class CompressionContext { InstantiationException, IllegalAccessException, InvocationTargetException { Constructor dictConstructor = dictType.getConstructor(); - regionDict = dictConstructor.newInstance(); - tableDict = dictConstructor.newInstance(); - familyDict = dictConstructor.newInstance(); - qualifierDict = dictConstructor.newInstance(); - rowDict = dictConstructor.newInstance(); - if (recoveredEdits) { - // This will never change - regionDict.init(1); - tableDict.init(1); - } else { - regionDict.init(Short.MAX_VALUE); - tableDict.init(Short.MAX_VALUE); + for (DictionaryIndex dictionaryIndex : DictionaryIndex.values()) { + Dictionary newDictionary = dictConstructor.newInstance(); + dictionaries.put(dictionaryIndex, newDictionary); } - rowDict.init(Short.MAX_VALUE); - familyDict.init(Byte.MAX_VALUE); - qualifierDict.init(Byte.MAX_VALUE); + if(recoveredEdits) { + getDictionary(DictionaryIndex.REGION).init(1); + getDictionary(DictionaryIndex.TABLE).init(1); + } else { + getDictionary(DictionaryIndex.REGION).init(Short.MAX_VALUE); + getDictionary(DictionaryIndex.TABLE).init(Short.MAX_VALUE); + } + + getDictionary(DictionaryIndex.ROW).init(Short.MAX_VALUE); + getDictionary(DictionaryIndex.FAMILY).init(Byte.MAX_VALUE); + getDictionary(DictionaryIndex.QUALIFIER).init(Byte.MAX_VALUE); + if (hasTagCompression) { tagCompressionContext = new TagCompressionContext(dictType, Short.MAX_VALUE); } } + public Dictionary getDictionary(Enum dictIndex) { + return dictionaries.get(dictIndex); + } + void clear() { - regionDict.clear(); - tableDict.clear(); - familyDict.clear(); - qualifierDict.clear(); - rowDict.clear(); + for(Dictionary dictionary : dictionaries.values()){ + dictionary.clear(); + } if (tagCompressionContext != null) { tagCompressionContext.clear(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java index 5d8d8c00ab5..83398bda943 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java @@ -312,6 +312,8 @@ public class ProtobufLogReader extends ReaderBase { this.cellDecoder = codec.getDecoder(this.inputStream); if (this.hasCompression) { this.byteStringUncompressor = codec.getByteStringUncompressor(); + } else { + this.byteStringUncompressor = WALCellCodec.getNoneUncompressor(); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java index 2852047b5ba..b4e2cbf196d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java @@ -47,7 +47,6 @@ public class ProtobufLogWriter extends AbstractProtobufLogWriter @Override public void append(Entry entry) throws IOException { - entry.setCompressionContext(compressionContext); entry.getKey().getBuilder(compressor). setFollowingKvCount(entry.getEdit().size()).build().writeDelimitedTo(output); for (Cell cell : entry.getEdit().getCells()) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ReaderBase.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ReaderBase.java index 4338f6d8e4d..27d40b25a20 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ReaderBase.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ReaderBase.java @@ -92,9 +92,6 @@ public abstract class ReaderBase implements AbstractFSWALProvider.Reader { if (e == null) { e = new Entry(); } - if (compressionContext != null) { - e.setCompressionContext(compressionContext); - } boolean hasEntry = false; try { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SecureProtobufLogReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SecureProtobufLogReader.java index b1f17ad4ea0..e43d140826c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SecureProtobufLogReader.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SecureProtobufLogReader.java @@ -141,6 +141,7 @@ public class SecureProtobufLogReader extends ProtobufLogReader { this.cellDecoder = codec.getDecoder(this.inputStream); // We do not support compression with WAL encryption this.compressionContext = null; + this.byteStringUncompressor = WALCellCodec.getNoneUncompressor(); this.hasCompression = false; } else { super.initAfterCompression(cellCodecClsName); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCellCodec.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCellCodec.java index 2922a10d51b..34d83f7817a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCellCodec.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCellCodec.java @@ -46,6 +46,7 @@ import org.apache.hadoop.hbase.util.ReflectionUtils; import org.apache.hadoop.io.IOUtils; import org.apache.hbase.thirdparty.com.google.protobuf.ByteString; +import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations; /** @@ -62,12 +63,6 @@ public class WALCellCodec implements Codec { public static final String WAL_CELL_CODEC_CLASS_KEY = "hbase.regionserver.wal.codec"; protected final CompressionContext compression; - protected final ByteStringUncompressor statelessUncompressor = new ByteStringUncompressor() { - @Override - public byte[] uncompress(ByteString data, Dictionary dict) throws IOException { - return WALCellCodec.uncompressByteString(data, dict); - } - }; /** * All subclasses must implement a no argument constructor @@ -132,17 +127,32 @@ public class WALCellCodec implements Codec { } public interface ByteStringCompressor { - ByteString compress(byte[] data, Dictionary dict) throws IOException; + ByteString compress(byte[] data, Enum dictIndex) throws IOException; } public interface ByteStringUncompressor { - byte[] uncompress(ByteString data, Dictionary dict) throws IOException; + byte[] uncompress(ByteString data, Enum dictIndex) throws IOException; + } + + static class StatelessUncompressor implements ByteStringUncompressor { + CompressionContext compressionContext; + + public StatelessUncompressor(CompressionContext compressionContext) { + this.compressionContext = compressionContext; + } + + @Override + public byte[] uncompress(ByteString data, Enum dictIndex) throws IOException { + return WALCellCodec.uncompressByteString(data, compressionContext.getDictionary(dictIndex)); + } } - // TODO: it sucks that compression context is in WAL.Entry. It'd be nice if it was here. - // Dictionary could be gotten by enum; initially, based on enum, context would create - // an array of dictionaries. static class BaosAndCompressor extends ByteArrayOutputStream implements ByteStringCompressor { + private CompressionContext compressionContext; + + public BaosAndCompressor(CompressionContext compressionContext) { + this.compressionContext = compressionContext; + } public ByteString toByteString() { // We need this copy to create the ByteString as the byte[] 'buf' is not immutable. We reuse // them. @@ -150,8 +160,8 @@ public class WALCellCodec implements Codec { } @Override - public ByteString compress(byte[] data, Dictionary dict) throws IOException { - writeCompressed(data, dict); + public ByteString compress(byte[] data, Enum dictIndex) throws IOException { + writeCompressed(data, dictIndex); // We need this copy to create the ByteString as the byte[] 'buf' is not immutable. We reuse // them. ByteString result = ByteString.copyFrom(this.buf, 0, this.count); @@ -159,7 +169,8 @@ public class WALCellCodec implements Codec { return result; } - private void writeCompressed(byte[] data, Dictionary dict) throws IOException { + private void writeCompressed(byte[] data, Enum dictIndex) throws IOException { + Dictionary dict = compressionContext.getDictionary(dictIndex); assert dict != null; short dictIdx = dict.findEntry(data, 0, data.length); if (dictIdx == Dictionary.NOT_IN_DICTIONARY) { @@ -172,6 +183,22 @@ public class WALCellCodec implements Codec { } } + static class NoneCompressor implements ByteStringCompressor { + + @Override + public ByteString compress(byte[] data, Enum dictIndex) { + return UnsafeByteOperations.unsafeWrap(data); + } + } + + static class NoneUncompressor implements ByteStringUncompressor { + + @Override + public byte[] uncompress(ByteString data, Enum dictIndex) { + return data.toByteArray(); + } + } + private static byte[] uncompressByteString(ByteString bs, Dictionary dict) throws IOException { InputStream in = bs.newInput(); byte status = (byte)in.read(); @@ -209,9 +236,12 @@ public class WALCellCodec implements Codec { // To support tags int tagsLength = cell.getTagsLength(); StreamUtils.writeRawVInt32(out, tagsLength); - PrivateCellUtil.compressRow(out, cell, compression.rowDict); - PrivateCellUtil.compressFamily(out, cell, compression.familyDict); - PrivateCellUtil.compressQualifier(out, cell, compression.qualifierDict); + PrivateCellUtil.compressRow(out, cell, + compression.getDictionary(CompressionContext.DictionaryIndex.ROW)); + PrivateCellUtil.compressFamily(out, cell, + compression.getDictionary(CompressionContext.DictionaryIndex.FAMILY)); + PrivateCellUtil.compressQualifier(out, cell, + compression.getDictionary(CompressionContext.DictionaryIndex.QUALIFIER)); // Write timestamp, type and value as uncompressed. StreamUtils.writeLong(out, cell.getTimestamp()); out.write(cell.getTypeByte()); @@ -255,19 +285,22 @@ public class WALCellCodec implements Codec { pos = Bytes.putInt(backingArray, pos, vlength); // the row - int elemLen = readIntoArray(backingArray, pos + Bytes.SIZEOF_SHORT, compression.rowDict); + int elemLen = readIntoArray(backingArray, pos + Bytes.SIZEOF_SHORT, + compression.getDictionary(CompressionContext.DictionaryIndex.ROW)); checkLength(elemLen, Short.MAX_VALUE); pos = Bytes.putShort(backingArray, pos, (short)elemLen); pos += elemLen; // family - elemLen = readIntoArray(backingArray, pos + Bytes.SIZEOF_BYTE, compression.familyDict); + elemLen = readIntoArray(backingArray, pos + Bytes.SIZEOF_BYTE, + compression.getDictionary(CompressionContext.DictionaryIndex.FAMILY)); checkLength(elemLen, Byte.MAX_VALUE); pos = Bytes.putByte(backingArray, pos, (byte)elemLen); pos += elemLen; // qualifier - elemLen = readIntoArray(backingArray, pos, compression.qualifierDict); + elemLen = readIntoArray(backingArray, pos, + compression.getDictionary(CompressionContext.DictionaryIndex.QUALIFIER)); pos += elemLen; // timestamp, type and value @@ -354,12 +387,18 @@ public class WALCellCodec implements Codec { } public ByteStringCompressor getByteStringCompressor() { - // TODO: ideally this should also encapsulate compressionContext - return new BaosAndCompressor(); + return new BaosAndCompressor(compression); } public ByteStringUncompressor getByteStringUncompressor() { - // TODO: ideally this should also encapsulate compressionContext - return this.statelessUncompressor; + return new StatelessUncompressor(compression); + } + + public static ByteStringCompressor getNoneCompressor() { + return new NoneCompressor(); + } + + public static ByteStringUncompressor getNoneUncompressor() { + return new NoneUncompressor(); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ClusterMarkingEntryFilter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ClusterMarkingEntryFilter.java index 6dc50019c89..5f92bbf3a65 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ClusterMarkingEntryFilter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ClusterMarkingEntryFilter.java @@ -60,8 +60,6 @@ public class ClusterMarkingEntryFilter implements WALEntryFilter { if (edit != null && !edit.isEmpty()) { // Mark that the current cluster has the change logKey.addClusterId(clusterId); - // We need to set the CC to null else it will be compressed when sent to the sink - entry.setCompressionContext(null); return entry; } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java index 3c857378a8e..4fb30febf28 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java @@ -283,7 +283,9 @@ public interface WAL extends Closeable, WALFileLengthProvider { * * @param compressionContext * Compression context + * @deprecated deparcated since hbase 2.1.0 */ + @Deprecated public void setCompressionContext(CompressionContext compressionContext) { key.setCompressionContext(compressionContext); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKeyImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKeyImpl.java index 8828239f48d..ba380597d87 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKeyImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKeyImpl.java @@ -37,7 +37,6 @@ import org.apache.yetus.audience.InterfaceAudience; import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; import org.apache.hbase.thirdparty.com.google.protobuf.ByteString; -import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations; import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos; @@ -117,8 +116,6 @@ public class WALKeyImpl implements WALKey { */ private MultiVersionConcurrencyControl.WriteEntry writeEntry; - private CompressionContext compressionContext; - public WALKeyImpl() { init(null, null, 0L, HConstants.LATEST_TIMESTAMP, new ArrayList<>(), HConstants.NO_NONCE, HConstants.NO_NONCE, null, null); @@ -334,9 +331,11 @@ public class WALKeyImpl implements WALKey { /** * @param compressionContext Compression context to use + * @deprecated deparcated since hbase 2.1.0 */ + @Deprecated public void setCompressionContext(CompressionContext compressionContext) { - this.compressionContext = compressionContext; + //do nothing } /** @return encoded region name */ @@ -517,18 +516,13 @@ public class WALKeyImpl implements WALKey { this.encodedRegionName = encodedRegionName; } - public WALProtos.WALKey.Builder getBuilder( - WALCellCodec.ByteStringCompressor compressor) throws IOException { + public WALProtos.WALKey.Builder getBuilder(WALCellCodec.ByteStringCompressor compressor) + throws IOException { WALProtos.WALKey.Builder builder = WALProtos.WALKey.newBuilder(); - if (compressionContext == null) { - builder.setEncodedRegionName(UnsafeByteOperations.unsafeWrap(this.encodedRegionName)); - builder.setTableName(UnsafeByteOperations.unsafeWrap(this.tablename.getName())); - } else { - builder.setEncodedRegionName(compressor.compress(this.encodedRegionName, - compressionContext.regionDict)); - builder.setTableName(compressor.compress(this.tablename.getName(), - compressionContext.tableDict)); - } + builder.setEncodedRegionName( + compressor.compress(this.encodedRegionName, CompressionContext.DictionaryIndex.REGION)); + builder.setTableName( + compressor.compress(this.tablename.getName(), CompressionContext.DictionaryIndex.TABLE)); builder.setLogSequenceNumber(getSequenceId()); builder.setWriteTime(writeTime); if (this.origLogSeqNum > 0) { @@ -548,29 +542,22 @@ public class WALKeyImpl implements WALKey { } if (replicationScope != null) { for (Map.Entry e : replicationScope.entrySet()) { - ByteString family = (compressionContext == null) - ? UnsafeByteOperations.unsafeWrap(e.getKey()) - : compressor.compress(e.getKey(), compressionContext.familyDict); - builder.addScopes(FamilyScope.newBuilder() - .setFamily(family).setScopeType(ScopeType.forNumber(e.getValue()))); + ByteString family = + compressor.compress(e.getKey(), CompressionContext.DictionaryIndex.FAMILY); + builder.addScopes(FamilyScope.newBuilder().setFamily(family) + .setScopeType(ScopeType.forNumber(e.getValue()))); } } return builder; } public void readFieldsFromPb(WALProtos.WALKey walKey, - WALCellCodec.ByteStringUncompressor uncompressor) - throws IOException { - if (this.compressionContext != null) { - this.encodedRegionName = uncompressor.uncompress( - walKey.getEncodedRegionName(), compressionContext.regionDict); - byte[] tablenameBytes = uncompressor.uncompress( - walKey.getTableName(), compressionContext.tableDict); - this.tablename = TableName.valueOf(tablenameBytes); - } else { - this.encodedRegionName = walKey.getEncodedRegionName().toByteArray(); - this.tablename = TableName.valueOf(walKey.getTableName().toByteArray()); - } + WALCellCodec.ByteStringUncompressor uncompressor) throws IOException { + this.encodedRegionName = uncompressor.uncompress(walKey.getEncodedRegionName(), + CompressionContext.DictionaryIndex.REGION); + byte[] tablenameBytes = + uncompressor.uncompress(walKey.getTableName(), CompressionContext.DictionaryIndex.TABLE); + this.tablename = TableName.valueOf(tablenameBytes); clusterIds.clear(); for (HBaseProtos.UUID clusterId : walKey.getClusterIdsList()) { clusterIds.add(new UUID(clusterId.getMostSigBits(), clusterId.getLeastSigBits())); @@ -585,14 +572,14 @@ public class WALKeyImpl implements WALKey { if (walKey.getScopesCount() > 0) { this.replicationScope = new TreeMap<>(Bytes.BYTES_COMPARATOR); for (FamilyScope scope : walKey.getScopesList()) { - byte[] family = (compressionContext == null) ? scope.getFamily().toByteArray() : - uncompressor.uncompress(scope.getFamily(), compressionContext.familyDict); + byte[] family = + uncompressor.uncompress(scope.getFamily(), CompressionContext.DictionaryIndex.FAMILY); this.replicationScope.put(family, scope.getScopeType().getNumber()); } } setSequenceId(walKey.getLogSequenceNumber()); this.writeTime = walKey.getWriteTime(); - if(walKey.hasOrigSequenceNumber()) { + if (walKey.hasOrigSequenceNumber()) { this.origLogSeqNum = walKey.getOrigSequenceNumber(); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/FaultyProtobufLogReader.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/FaultyProtobufLogReader.java index f1508e5bc93..2c74f80b3a8 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/FaultyProtobufLogReader.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/FaultyProtobufLogReader.java @@ -45,9 +45,6 @@ public class FaultyProtobufLogReader extends ProtobufLogReader { boolean b; do { Entry e = new Entry(); - if (compressionContext != null) { - e.setCompressionContext(compressionContext); - } b = readNext(e); nextQueue.offer(e); numberOfFileEntries++;