HBASE-25869 WAL value compression (#3244)

WAL storage can be expensive, especially if the cell values
represented in the edits are large, consisting of blobs or
significant lengths of text. Such WALs might need to be kept around
for a fairly long time to satisfy replication constraints on a space
limited (or space-contended) filesystem.

We have a custom dictionary compression scheme for cell metadata that
is engaged when WAL compression is enabled in site configuration.
This is fine for that application, where we can expect the universe
of values and their lengths in the custom dictionaries to be
constrained. For arbitrary cell values it is better to use one of the
available compression codecs, which are suitable for arbitrary albeit
compressible data.

Signed-off-by: Bharath Vissapragada <bharathv@apache.org>
Signed-off-by: Duo Zhang <zhangduo@apache.org>
Signed-off-by: Nick Dimiduk <ndimiduk@apache.org>
This commit is contained in:
Andrew Purtell 2021-05-21 11:05:52 -07:00
parent 7b374b7c76
commit 2d591ab3c4
15 changed files with 849 additions and 44 deletions

View File

@ -76,8 +76,6 @@ import org.slf4j.LoggerFactory;
*/
@InterfaceAudience.Private
public class KeyValue implements ExtendedCell, Cloneable {
private static final ArrayList<Tag> EMPTY_ARRAY_LIST = new ArrayList<>();
private static final Logger LOG = LoggerFactory.getLogger(KeyValue.class);
public static final int FIXED_OVERHEAD = ClassSize.OBJECT + // the KeyValue object itself

View File

@ -0,0 +1,111 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.io;
import java.io.IOException;
import java.io.InputStream;
import org.apache.yetus.audience.InterfaceAudience;
/**
* This is a stream that will only supply bytes from its delegate up to a certain limit.
* When there is an attempt to set the position beyond that it will signal that the input
* is finished.
*/
@InterfaceAudience.Private
public class BoundedDelegatingInputStream extends DelegatingInputStream {
protected long limit;
protected long pos;
public BoundedDelegatingInputStream(InputStream in, long limit) {
super(in);
this.limit = limit;
this.pos = 0;
}
public void setDelegate(InputStream in, long limit) {
this.in = in;
this.limit = limit;
this.pos = 0;
}
/**
* Call the delegate's {@code read()} method if the current position is less than the limit.
* @return the byte read or -1 if the end of stream or the limit has been reached.
*/
@Override
public int read() throws IOException {
if (pos >= limit) {
return -1;
}
int result = in.read();
pos++;
return result;
}
/**
* Call the delegate's {@code read(byte[], int, int)} method if the current position is less
* than the limit.
* @param b read buffer
* @param off Start offset
* @param len The number of bytes to read
* @return the number of bytes read or -1 if the end of stream or the limit has been reached.
*/
@Override
public int read(final byte[] b, final int off, final int len) throws IOException {
if (pos >= limit) {
return -1;
}
long readLen = Math.min(len, limit - pos);
int read = in.read(b, off, (int)readLen);
if (read < 0) {
return -1;
}
pos += read;
return read;
}
/**
* Call the delegate's {@code skip(long)} method.
* @param len the number of bytes to skip
* @return the actual number of bytes skipped
*/
@Override
public long skip(final long len) throws IOException {
long skipped = in.skip(Math.min(len, limit - pos));
pos += skipped;
return skipped;
}
/**
* Call the delegate's {@code available()} method.
* @return the delegate's available bytes if the current position is less than the limit,
* or 0 otherwise
*/
@Override
public int available() throws IOException {
if (pos >= limit) {
return 0;
}
int available = in.available();
return (int) Math.min(available, limit - pos);
}
}

View File

@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.io;
import java.io.FilterInputStream;
import java.io.InputStream;
import org.apache.yetus.audience.InterfaceAudience;
/**
* An input stream that delegates all operations to another input stream.
* The delegate can be switched out for another at any time but to minimize the
* possibility of violating the InputStream contract it would be best to replace
* the delegate only once it has been fully consumed. <p> For example, a
* ByteArrayInputStream, which is implicitly bounded by the size of the underlying
* byte array can be converted into an unbounded stream fed by multiple instances
* of ByteArrayInputStream, switched out one for the other in sequence.
* <p>
* Although multithreaded access is allowed, users of this class will want to take
* care to order operations on this stream and the swap out of one delegate for
* another in a way that provides a valid view of stream contents.
*/
@InterfaceAudience.Private
public class DelegatingInputStream extends FilterInputStream {
public DelegatingInputStream(InputStream in) {
super(in);
}
public InputStream getDelegate() {
return this.in;
}
public void setDelegate(InputStream in) {
this.in = in;
}
}

View File

@ -32,6 +32,8 @@ message WALHeader {
optional bool has_tag_compression = 3;
optional string writer_cls_name = 4;
optional string cell_codec_cls_name = 5;
optional bool has_value_compression = 6;
optional uint32 value_compression_algorithm = 7;
}
/*

View File

@ -33,6 +33,8 @@ message WALHeader {
optional bool has_tag_compression = 3;
optional string writer_cls_name = 4;
optional string cell_codec_cls_name = 5;
optional bool has_value_compression = 6;
optional uint32 value_compression_algorithm = 7;
}
/*

View File

@ -32,6 +32,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.codec.Codec;
import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.hadoop.hbase.io.crypto.Cipher;
import org.apache.hadoop.hbase.io.crypto.Encryption;
import org.apache.hadoop.hbase.io.crypto.Encryptor;
@ -144,9 +145,22 @@ public abstract class AbstractProtobufLogWriter {
boolean doCompress = conf.getBoolean(HConstants.ENABLE_WAL_COMPRESSION, false);
if (doCompress) {
try {
final boolean useTagCompression =
conf.getBoolean(CompressionContext.ENABLE_WAL_TAGS_COMPRESSION, true);
final boolean useValueCompression =
conf.getBoolean(CompressionContext.ENABLE_WAL_VALUE_COMPRESSION, false);
final Compression.Algorithm valueCompressionType =
useValueCompression ? CompressionContext.getValueCompressionAlgorithm(conf) :
Compression.Algorithm.NONE;
if (LOG.isTraceEnabled()) {
LOG.trace("Initializing compression context for {}: isRecoveredEdits={}" +
", hasTagCompression={}, hasValueCompression={}, valueCompressionType={}", path,
CommonFSUtils.isRecoveredEdits(path), useTagCompression, useValueCompression,
valueCompressionType);
}
this.compressionContext =
new CompressionContext(LRUDictionary.class, CommonFSUtils.isRecoveredEdits(path),
conf.getBoolean(CompressionContext.ENABLE_WAL_TAGS_COMPRESSION, true));
useTagCompression, useValueCompression, valueCompressionType);
} catch (Exception e) {
throw new IOException("Failed to initiate CompressionContext", e);
}
@ -165,17 +179,29 @@ public abstract class AbstractProtobufLogWriter {
initOutput(fs, path, overwritable, bufferSize, replication, blocksize);
boolean doTagCompress = doCompress
&& conf.getBoolean(CompressionContext.ENABLE_WAL_TAGS_COMPRESSION, true);
length.set(writeMagicAndWALHeader(ProtobufLogReader.PB_WAL_MAGIC, buildWALHeader(conf,
WALHeader.newBuilder().setHasCompression(doCompress).setHasTagCompression(doTagCompress))));
boolean doTagCompress = doCompress &&
conf.getBoolean(CompressionContext.ENABLE_WAL_TAGS_COMPRESSION, true);
boolean doValueCompress = doCompress &&
conf.getBoolean(CompressionContext.ENABLE_WAL_VALUE_COMPRESSION, false);
WALHeader.Builder headerBuilder = WALHeader.newBuilder()
.setHasCompression(doCompress)
.setHasTagCompression(doTagCompress)
.setHasValueCompression(doValueCompress);
if (doValueCompress) {
headerBuilder.setValueCompressionAlgorithm(
CompressionContext.getValueCompressionAlgorithm(conf).ordinal());
}
length.set(writeMagicAndWALHeader(ProtobufLogReader.PB_WAL_MAGIC,
buildWALHeader(conf, headerBuilder)));
initAfterHeader(doCompress);
// instantiate trailer to default value.
trailer = WALTrailer.newBuilder().build();
if (LOG.isTraceEnabled()) {
LOG.trace("Initialized protobuf WAL=" + path + ", compression=" + doCompress);
LOG.trace("Initialized protobuf WAL={}, compression={}, tagCompression={}" +
", valueCompression={}", path, doCompress, doTagCompress, doValueCompress);
}
}

View File

@ -18,37 +18,155 @@
package org.apache.hadoop.hbase.regionserver.wal;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.util.EnumMap;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.io.BoundedDelegatingInputStream;
import org.apache.hadoop.hbase.io.TagCompressionContext;
import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.hadoop.hbase.io.util.Dictionary;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Context that holds the various dictionaries for compression in WAL.
* <p>
* CompressionContexts are not expected to be shared among threads. Multithreaded use may
* produce unexpected results.
*/
@InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX})
public class CompressionContext {
static final String ENABLE_WAL_TAGS_COMPRESSION =
private static final Logger LOG = LoggerFactory.getLogger(CompressionContext.class);
public static final String ENABLE_WAL_TAGS_COMPRESSION =
"hbase.regionserver.wal.tags.enablecompression";
public static final String ENABLE_WAL_VALUE_COMPRESSION =
"hbase.regionserver.wal.value.enablecompression";
public static final String WAL_VALUE_COMPRESSION_TYPE =
"hbase.regionserver.wal.value.compression.type";
public enum DictionaryIndex {
REGION, TABLE, FAMILY, QUALIFIER, ROW
}
/**
* Encapsulates the compression algorithm and its streams that we will use for value
* compression in this WAL.
*/
static class ValueCompressor {
static final int IO_BUFFER_SIZE = 4096;
private final Compression.Algorithm algorithm;
private BoundedDelegatingInputStream lowerIn;
private ByteArrayOutputStream lowerOut;
private InputStream compressedIn;
private OutputStream compressedOut;
public ValueCompressor(Compression.Algorithm algorithm) {
this.algorithm = algorithm;
}
public Compression.Algorithm getAlgorithm() {
return algorithm;
}
public byte[] compress(byte[] valueArray, int valueOffset, int valueLength)
throws IOException {
if (compressedOut == null) {
// Create the output streams here the first time around.
lowerOut = new ByteArrayOutputStream();
compressedOut = algorithm.createCompressionStream(lowerOut, algorithm.getCompressor(),
IO_BUFFER_SIZE);
} else {
lowerOut.reset();
}
compressedOut.write(valueArray, valueOffset, valueLength);
compressedOut.flush();
return lowerOut.toByteArray();
}
public int decompress(InputStream in, int inLength, byte[] outArray, int outOffset,
int outLength) throws IOException {
// Our input is a sequence of bounded byte ranges (call them segments), with
// BoundedDelegatingInputStream providing a way to switch in a new segment when the
// previous segment has been fully consumed.
// Create the input streams here the first time around.
if (compressedIn == null) {
lowerIn = new BoundedDelegatingInputStream(in, inLength);
compressedIn = algorithm.createDecompressionStream(lowerIn, algorithm.getDecompressor(),
IO_BUFFER_SIZE);
} else {
lowerIn.setDelegate(in, inLength);
}
// Caller must handle short reads.
// With current Hadoop compression codecs all 'outLength' bytes are read in here, so not
// an issue for now.
return compressedIn.read(outArray, outOffset, outLength);
}
public void clear() {
if (compressedOut != null) {
try {
compressedOut.close();
} catch (IOException e) {
LOG.warn("Exception closing compressed output stream", e);
}
}
compressedOut = null;
if (lowerOut != null) {
try {
lowerOut.close();
} catch (IOException e) {
LOG.warn("Exception closing lower output stream", e);
}
}
lowerOut = null;
if (compressedIn != null) {
try {
compressedIn.close();
} catch (IOException e) {
LOG.warn("Exception closing compressed input stream", e);
}
}
compressedIn = null;
if (lowerIn != null) {
try {
lowerIn.close();
} catch (IOException e) {
LOG.warn("Exception closing lower input stream", e);
}
}
lowerIn = null;
}
}
private final Map<DictionaryIndex, Dictionary> dictionaries =
new EnumMap<>(DictionaryIndex.class);
// Context used for compressing tags
TagCompressionContext tagCompressionContext = null;
ValueCompressor valueCompressor = null;
public CompressionContext(Class<? extends Dictionary> dictType, boolean recoveredEdits,
boolean hasTagCompression) throws SecurityException, NoSuchMethodException,
InstantiationException, IllegalAccessException, InvocationTargetException {
public CompressionContext(Class<? extends Dictionary> dictType,
boolean recoveredEdits, boolean hasTagCompression, boolean hasValueCompression,
Compression.Algorithm valueCompressionType)
throws SecurityException, NoSuchMethodException, InstantiationException,
IllegalAccessException, InvocationTargetException, IOException {
Constructor<? extends Dictionary> dictConstructor =
dictType.getConstructor();
for (DictionaryIndex dictionaryIndex : DictionaryIndex.values()) {
@ -70,12 +188,34 @@ public class CompressionContext {
if (hasTagCompression) {
tagCompressionContext = new TagCompressionContext(dictType, Short.MAX_VALUE);
}
if (hasValueCompression && valueCompressionType != null) {
valueCompressor = new ValueCompressor(valueCompressionType);
}
}
public CompressionContext(Class<? extends Dictionary> dictType, boolean recoveredEdits,
boolean hasTagCompression)
throws SecurityException, NoSuchMethodException, InstantiationException,
IllegalAccessException, InvocationTargetException, IOException {
this(dictType, recoveredEdits, hasTagCompression, false, null);
}
public boolean hasTagCompression() {
return tagCompressionContext != null;
}
public boolean hasValueCompression() {
return valueCompressor != null;
}
public Dictionary getDictionary(Enum dictIndex) {
return dictionaries.get(dictIndex);
}
public ValueCompressor getValueCompressor() {
return valueCompressor;
}
void clear() {
for(Dictionary dictionary : dictionaries.values()){
dictionary.clear();
@ -83,5 +223,20 @@ public class CompressionContext {
if (tagCompressionContext != null) {
tagCompressionContext.clear();
}
if (valueCompressor != null) {
valueCompressor.clear();
}
}
public static Compression.Algorithm getValueCompressionAlgorithm(Configuration conf) {
if (conf.getBoolean(ENABLE_WAL_VALUE_COMPRESSION, true)) {
String compressionType = conf.get(WAL_VALUE_COMPRESSION_TYPE);
if (compressionType != null) {
return Compression.getCompressionAlgorithmByName(compressionType);
}
return Compression.Algorithm.GZ;
}
return Compression.Algorithm.NONE;
}
}

View File

@ -31,6 +31,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.hbase.codec.Codec;
import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
@ -81,6 +82,8 @@ public class ProtobufLogReader extends ReaderBase {
protected WALCellCodec.ByteStringUncompressor byteStringUncompressor;
protected boolean hasCompression = false;
protected boolean hasTagCompression = false;
protected boolean hasValueCompression = false;
protected Compression.Algorithm valueCompressionType = null;
// walEditsStopOffset is the position of the last byte to read. After reading the last WALEdit
// entry in the wal, the inputstream's position is equal to walEditsStopOffset.
private long walEditsStopOffset;
@ -227,6 +230,16 @@ public class ProtobufLogReader extends ReaderBase {
WALProtos.WALHeader header = builder.build();
this.hasCompression = header.hasHasCompression() && header.getHasCompression();
this.hasTagCompression = header.hasHasTagCompression() && header.getHasTagCompression();
this.hasValueCompression = header.hasHasValueCompression() &&
header.getHasValueCompression();
if (header.hasValueCompressionAlgorithm()) {
try {
this.valueCompressionType =
Compression.Algorithm.values()[header.getValueCompressionAlgorithm()];
} catch (ArrayIndexOutOfBoundsException e) {
throw new IOException("Invalid compression type", e);
}
}
}
this.inputStream = stream;
this.walEditsStopOffset = this.fileLength;
@ -235,7 +248,9 @@ public class ProtobufLogReader extends ReaderBase {
this.seekOnFs(currentPosition);
if (LOG.isTraceEnabled()) {
LOG.trace("After reading the trailer: walEditsStopOffset: " + this.walEditsStopOffset
+ ", fileLength: " + this.fileLength + ", " + "trailerPresent: " + (trailerPresent ? "true, size: " + trailer.getSerializedSize() : "false") + ", currentPosition: " + currentPosition);
+ ", fileLength: " + this.fileLength + ", " + "trailerPresent: " +
(trailerPresent ? "true, size: " + trailer.getSerializedSize() : "false") +
", currentPosition: " + currentPosition);
}
codecClsName = hdrCtxt.getCellCodecClsName();
@ -327,6 +342,16 @@ public class ProtobufLogReader extends ReaderBase {
return this.hasTagCompression;
}
@Override
protected boolean hasValueCompression() {
return this.hasValueCompression;
}
@Override
protected Compression.Algorithm getValueCompressionAlgorithm() {
return this.valueCompressionType;
}
@Override
protected boolean readNext(Entry entry) throws IOException {
// OriginalPosition might be < 0 on local fs; if so, it is useless to us.

View File

@ -26,6 +26,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.hadoop.hbase.io.util.LRUDictionary;
import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
@ -68,8 +69,15 @@ public abstract class ReaderBase implements AbstractFSWALProvider.Reader {
// If compression is enabled, new dictionaries are created here.
try {
if (compressionContext == null) {
if (LOG.isDebugEnabled()) {
LOG.debug("Initializing compression context for {}: isRecoveredEdits={}" +
", hasTagCompression={}, hasValueCompression={}, valueCompressionType={}", path,
CommonFSUtils.isRecoveredEdits(path), hasTagCompression(), hasValueCompression(),
getValueCompressionAlgorithm());
}
compressionContext = new CompressionContext(LRUDictionary.class,
CommonFSUtils.isRecoveredEdits(path), hasTagCompression());
CommonFSUtils.isRecoveredEdits(path), hasTagCompression(),
hasValueCompression(), getValueCompressionAlgorithm());
} else {
compressionContext.clear();
}
@ -151,6 +159,16 @@ public abstract class ReaderBase implements AbstractFSWALProvider.Reader {
*/
protected abstract boolean hasTagCompression();
/**
* @return Whether value compression is enabled for this log.
*/
protected abstract boolean hasValueCompression();
/**
* @return Value compression algorithm for this log.
*/
protected abstract Compression.Algorithm getValueCompressionAlgorithm();
/**
* Read next entry.
* @param e The entry to read into.

View File

@ -222,9 +222,13 @@ public class WALCellCodec implements Codec {
static class CompressedKvEncoder extends BaseEncoder {
private final CompressionContext compression;
private final boolean hasValueCompression;
private final boolean hasTagCompression;
public CompressedKvEncoder(OutputStream out, CompressionContext compression) {
super(out);
this.compression = compression;
this.hasValueCompression = compression.hasValueCompression();
this.hasTagCompression = compression.hasTagCompression();
}
@Override
@ -241,12 +245,16 @@ public class WALCellCodec implements Codec {
compression.getDictionary(CompressionContext.DictionaryIndex.FAMILY));
PrivateCellUtil.compressQualifier(out, cell,
compression.getDictionary(CompressionContext.DictionaryIndex.QUALIFIER));
// Write timestamp, type and value as uncompressed.
// Write timestamp, type and value.
StreamUtils.writeLong(out, cell.getTimestamp());
out.write(cell.getTypeByte());
if (hasValueCompression) {
writeCompressedValue(out, cell);
} else {
PrivateCellUtil.writeValue(out, cell, cell.getValueLength());
}
if (tagsLength > 0) {
if (compression.tagCompressionContext != null) {
if (hasTagCompression) {
// Write tags using Dictionary compression
PrivateCellUtil.compressTags(out, cell, compression.tagCompressionContext);
} else {
@ -256,20 +264,31 @@ public class WALCellCodec implements Codec {
}
}
}
private void writeCompressedValue(OutputStream out, Cell cell) throws IOException {
byte[] compressed = compression.getValueCompressor().compress(cell.getValueArray(),
cell.getValueOffset(), cell.getValueLength());
StreamUtils.writeRawVInt32(out, compressed.length);
out.write(compressed);
}
}
static class CompressedKvDecoder extends BaseDecoder {
private final CompressionContext compression;
private final boolean hasValueCompression;
private final boolean hasTagCompression;
public CompressedKvDecoder(InputStream in, CompressionContext compression) {
super(in);
this.compression = compression;
this.hasValueCompression = compression.hasValueCompression();
this.hasTagCompression = compression.hasTagCompression();
}
@Override
protected Cell parseCell() throws IOException {
int keylength = StreamUtils.readRawVarint32(in);
int vlength = StreamUtils.readRawVarint32(in);
int tagsLength = StreamUtils.readRawVarint32(in);
int length = 0;
if(tagsLength == 0) {
@ -302,18 +321,27 @@ public class WALCellCodec implements Codec {
compression.getDictionary(CompressionContext.DictionaryIndex.QUALIFIER));
pos += elemLen;
// timestamp, type and value
int tsTypeValLen = length - pos;
// timestamp
long ts = StreamUtils.readLong(in);
pos = Bytes.putLong(backingArray, pos, ts);
// type and value
int typeValLen = length - pos;
if (tagsLength > 0) {
tsTypeValLen = tsTypeValLen - tagsLength - KeyValue.TAGS_LENGTH_SIZE;
typeValLen = typeValLen - tagsLength - KeyValue.TAGS_LENGTH_SIZE;
}
pos = Bytes.putByte(backingArray, pos, (byte)in.read());
int valLen = typeValLen - 1;
if (hasValueCompression) {
readCompressedValue(in, backingArray, pos, valLen);
pos += valLen;
} else {
IOUtils.readFully(in, backingArray, pos, valLen);
pos += valLen;
}
IOUtils.readFully(in, backingArray, pos, tsTypeValLen);
pos += tsTypeValLen;
// tags
if (tagsLength > 0) {
pos = Bytes.putAsShort(backingArray, pos, tagsLength);
if (compression.tagCompressionContext != null) {
if (hasTagCompression) {
compression.tagCompressionContext.uncompressTags(in, backingArray, pos, tagsLength);
} else {
IOUtils.readFully(in, backingArray, pos, tagsLength);
@ -349,6 +377,17 @@ public class WALCellCodec implements Codec {
throw new IOException("Invalid length for compresesed portion of keyvalue: " + len);
}
}
private void readCompressedValue(InputStream in, byte[] outArray, int outOffset,
int expectedLength) throws IOException {
int compressedLen = StreamUtils.readRawVarint32(in);
int read = compression.getValueCompressor().decompress(in, compressedLen, outArray,
outOffset, expectedLength);
if (read != expectedLength) {
throw new IOException("ValueCompressor state error: short read");
}
}
}
public static class EnsureKvEncoder extends BaseEncoder {

View File

@ -0,0 +1,43 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver.wal;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.experimental.categories.Category;
@Category({ RegionServerTests.class, MediumTests.class })
public class TestAsyncWALReplayValueCompression extends TestAsyncWALReplay {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestAsyncWALReplayValueCompression.class);
@BeforeClass
public static void setUpBeforeClass() throws Exception {
Configuration conf = AbstractTestWALReplay.TEST_UTIL.getConfiguration();
conf.setBoolean(HConstants.ENABLE_WAL_COMPRESSION, true);
conf.setBoolean(CompressionContext.ENABLE_WAL_VALUE_COMPRESSION, true);
TestAsyncWALReplay.setUpBeforeClass();
}
}

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.hbase.regionserver.wal;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
@ -25,6 +26,7 @@ import java.io.InputStream;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.ArrayBackedTag;
import org.apache.hadoop.hbase.ByteBufferKeyValue;
@ -35,6 +37,7 @@ import org.apache.hadoop.hbase.PrivateCellUtil;
import org.apache.hadoop.hbase.Tag;
import org.apache.hadoop.hbase.codec.Codec.Decoder;
import org.apache.hadoop.hbase.codec.Codec.Encoder;
import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.hadoop.hbase.io.util.LRUDictionary;
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
@ -62,24 +65,108 @@ public class TestWALCellCodecWithCompression {
@Test
public void testEncodeDecodeOffKVsWithTagsWithTagsCompression() throws Exception {
doTest(true, true);
doTest(true, false);
}
private void doTest(boolean compressTags, boolean offheapKV) throws Exception {
@Test
public void testValueCompressionEnabled() throws Exception {
doTest(false, true);
}
@Test
public void testValueCompression() throws Exception {
final byte[] row_1 = Bytes.toBytes("row_1");
final byte[] value_1 = new byte[20];
Bytes.zero(value_1);
final byte[] row_2 = Bytes.toBytes("row_2");
final byte[] value_2 = new byte[Bytes.SIZEOF_LONG];
Bytes.random(value_2);
final byte[] row_3 = Bytes.toBytes("row_3");
final byte[] value_3 = new byte[100];
Bytes.random(value_3);
final byte[] row_4 = Bytes.toBytes("row_4");
final byte[] value_4 = new byte[128];
fillBytes(value_4, Bytes.toBytes("DEADBEEF"));
final byte[] row_5 = Bytes.toBytes("row_5");
final byte[] value_5 = new byte[64];
fillBytes(value_5, Bytes.toBytes("CAFEBABE"));
Configuration conf = new Configuration(false);
WALCellCodec codec = new WALCellCodec(conf, new CompressionContext(LRUDictionary.class,
false, true, true, Compression.Algorithm.GZ));
ByteArrayOutputStream bos = new ByteArrayOutputStream();
Encoder encoder = codec.getEncoder(bos);
encoder.write(createKV(row_1, value_1, 0));
encoder.write(createKV(row_2, value_2, 0));
encoder.write(createKV(row_3, value_3, 0));
encoder.write(createKV(row_4, value_4, 0));
encoder.write(createKV(row_5, value_5, 0));
encoder.flush();
try (InputStream is = new ByteArrayInputStream(bos.toByteArray())) {
Decoder decoder = codec.getDecoder(is);
decoder.advance();
KeyValue kv = (KeyValue) decoder.current();
assertTrue(Bytes.equals(row_1, 0, row_1.length,
kv.getRowArray(), kv.getRowOffset(), kv.getRowLength()));
assertTrue(Bytes.equals(value_1, 0, value_1.length,
kv.getValueArray(), kv.getValueOffset(), kv.getValueLength()));
decoder.advance();
kv = (KeyValue) decoder.current();
assertTrue(Bytes.equals(row_2, 0, row_2.length,
kv.getRowArray(), kv.getRowOffset(), kv.getRowLength()));
assertTrue(Bytes.equals(value_2, 0, value_2.length,
kv.getValueArray(), kv.getValueOffset(), kv.getValueLength()));
decoder.advance();
kv = (KeyValue) decoder.current();
assertTrue(Bytes.equals(row_3, 0, row_3.length,
kv.getRowArray(), kv.getRowOffset(), kv.getRowLength()));
assertTrue(Bytes.equals(value_3, 0, value_3.length,
kv.getValueArray(), kv.getValueOffset(), kv.getValueLength()));
decoder.advance();
kv = (KeyValue) decoder.current();
assertTrue(Bytes.equals(row_4, 0, row_4.length,
kv.getRowArray(), kv.getRowOffset(), kv.getRowLength()));
assertTrue(Bytes.equals(value_4, 0, value_4.length,
kv.getValueArray(), kv.getValueOffset(), kv.getValueLength()));
decoder.advance();
kv = (KeyValue) decoder.current();
assertTrue(Bytes.equals(row_5, 0, row_5.length,
kv.getRowArray(), kv.getRowOffset(), kv.getRowLength()));
assertTrue(Bytes.equals(value_5, 0, value_5.length,
kv.getValueArray(), kv.getValueOffset(), kv.getValueLength()));
}
}
static void fillBytes(byte[] buffer, byte[] fill) {
int offset = 0;
int remaining = buffer.length;
while (remaining > 0) {
int len = remaining < fill.length ? remaining : fill.length;
System.arraycopy(fill, 0, buffer, offset, len);
offset += len;
remaining -= len;
}
}
private void doTest(boolean compressTags, boolean offheapKV)
throws Exception {
final byte[] key = Bytes.toBytes("myRow");
final byte[] value = Bytes.toBytes("myValue");
Configuration conf = new Configuration(false);
conf.setBoolean(CompressionContext.ENABLE_WAL_TAGS_COMPRESSION, compressTags);
WALCellCodec codec = new WALCellCodec(conf, new CompressionContext(LRUDictionary.class, false,
compressTags));
WALCellCodec codec = new WALCellCodec(conf, new CompressionContext(LRUDictionary.class,
false, compressTags));
ByteArrayOutputStream bos = new ByteArrayOutputStream(1024);
Encoder encoder = codec.getEncoder(bos);
if (offheapKV) {
encoder.write(createOffheapKV(1));
encoder.write(createOffheapKV(0));
encoder.write(createOffheapKV(2));
encoder.write(createOffheapKV(key, value, 1));
encoder.write(createOffheapKV(key, value, 0));
encoder.write(createOffheapKV(key, value, 2));
} else {
encoder.write(createKV(1));
encoder.write(createKV(0));
encoder.write(createKV(2));
encoder.write(createKV(key, value, 1));
encoder.write(createKV(key, value, 0));
encoder.write(createKV(key, value, 2));
}
InputStream is = new ByteArrayInputStream(bos.toByteArray());
@ -101,11 +188,9 @@ public class TestWALCellCodecWithCompression {
assertEquals("tagValue2", Bytes.toString(Tag.cloneValue(tags.get(1))));
}
private KeyValue createKV(int noOfTags) {
byte[] row = Bytes.toBytes("myRow");
private KeyValue createKV(byte[] row, byte[] value, int noOfTags) {
byte[] cf = Bytes.toBytes("myCF");
byte[] q = Bytes.toBytes("myQualifier");
byte[] value = Bytes.toBytes("myValue");
List<Tag> tags = new ArrayList<>(noOfTags);
for (int i = 1; i <= noOfTags; i++) {
tags.add(new ArrayBackedTag((byte) i, Bytes.toBytes("tagValue" + i)));
@ -113,11 +198,9 @@ public class TestWALCellCodecWithCompression {
return new KeyValue(row, cf, q, HConstants.LATEST_TIMESTAMP, value, tags);
}
private ByteBufferKeyValue createOffheapKV(int noOfTags) {
byte[] row = Bytes.toBytes("myRow");
private ByteBufferKeyValue createOffheapKV(byte[] row, byte[] value, int noOfTags) {
byte[] cf = Bytes.toBytes("myCF");
byte[] q = Bytes.toBytes("myQualifier");
byte[] value = Bytes.toBytes("myValue");
List<Tag> tags = new ArrayList<>(noOfTags);
for (int i = 1; i <= noOfTags; i++) {
tags.add(new ArrayBackedTag((byte) i, Bytes.toBytes("tagValue" + i)));

View File

@ -0,0 +1,46 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver.wal;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.experimental.categories.Category;
/**
* Enables compression and runs the TestWALReplay tests.
*/
@Category({ RegionServerTests.class, LargeTests.class })
public class TestWALReplayValueCompression extends TestWALReplay {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestWALReplayValueCompression.class);
@BeforeClass
public static void setUpBeforeClass() throws Exception {
Configuration conf = AbstractTestWALReplay.TEST_UTIL.getConfiguration();
conf.setBoolean(HConstants.ENABLE_WAL_COMPRESSION, true);
conf.setBoolean(CompressionContext.ENABLE_WAL_VALUE_COMPRESSION, true);
TestWALReplay.setUpBeforeClass();
}
}

View File

@ -0,0 +1,159 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.wal;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.util.Arrays;
import java.util.List;
import java.util.NavigableMap;
import java.util.TreeMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
import org.apache.hadoop.hbase.regionserver.wal.CompressionContext;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameter;
import org.junit.runners.Parameterized.Parameters;
@RunWith(Parameterized.class)
@Category({ RegionServerTests.class, MediumTests.class })
public class TestCompressedWAL {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestCompressedWAL.class);
static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
@Rule
public TestName name = new TestName();
@Parameter
public String walProvider;
@Parameters(name = "{index}: provider={0}")
public static Iterable<Object[]> data() {
return Arrays.asList(new Object[] { "defaultProvider" }, new Object[] { "asyncfs" });
}
@BeforeClass
public static void setUpBeforeClass() throws Exception {
Configuration conf = TEST_UTIL.getConfiguration();
CommonFSUtils.setRootDir(conf, TEST_UTIL.getDataTestDirOnTestFS());
TEST_UTIL.startMiniDFSCluster(3);
}
@AfterClass
public static void tearDownAfterClass() throws Exception {
TEST_UTIL.shutdownMiniCluster();
}
@Before
public void setUp() {
TEST_UTIL.getConfiguration().set(WALFactory.WAL_PROVIDER, walProvider);
TEST_UTIL.getConfiguration()
.setBoolean(HConstants.ENABLE_WAL_COMPRESSION, true);
}
@Test
public void testCompressedWAL() throws Exception {
TEST_UTIL.getConfiguration()
.setBoolean(CompressionContext.ENABLE_WAL_VALUE_COMPRESSION, false);
doTest();
}
@Test
public void testCompressedWALWithValueCompression() throws Exception {
TEST_UTIL.getConfiguration()
.setBoolean(CompressionContext.ENABLE_WAL_VALUE_COMPRESSION, true);
doTest();
}
private void doTest() throws Exception {
TableName tableName = TableName.valueOf(name.getMethodName().replaceAll("[^a-zA-Z0-9]", "_"));
NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR);
scopes.put(tableName.getName(), 0);
RegionInfo regionInfo = RegionInfoBuilder.newBuilder(tableName).build();
final int total = 1000;
final byte[] row = Bytes.toBytes("row");
final byte[] family = Bytes.toBytes("family");
final byte[] value = Bytes.toBytes("Test value");
FileSystem fs = TEST_UTIL.getDFSCluster().getFileSystem();
final WALFactory wals =
new WALFactory(TEST_UTIL.getConfiguration(), tableName.getNameAsString());
// Write the WAL
final WAL wal = wals.getWAL(regionInfo);
MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
for (int i = 0; i < total; i++) {
WALEdit kvs = new WALEdit();
kvs.add(new KeyValue(row, family, Bytes.toBytes(i), value));
wal.appendData(regionInfo, new WALKeyImpl(regionInfo.getEncodedNameAsBytes(), tableName,
System.currentTimeMillis(), mvcc, scopes), kvs);
}
wal.sync();
final Path walPath = AbstractFSWALProvider.getCurrentFileName(wal);
wals.shutdown();
// Confirm the WAL can be read back
WAL.Reader reader = wals.createReader(TEST_UTIL.getTestFileSystem(), walPath);
int count = 0;
WAL.Entry entry = new WAL.Entry();
while (reader.next(entry) != null) {
count++;
List<Cell> cells = entry.getEdit().getCells();
assertTrue("Should be one KV per WALEdit", cells.size() == 1);
for (Cell cell: cells) {
assertTrue("Incorrect row", Bytes.equals(cell.getRowArray(), cell.getRowOffset(),
cell.getRowLength(), row, 0, row.length));
assertTrue("Incorrect family", Bytes.equals(cell.getFamilyArray(), cell.getFamilyOffset(),
cell.getFamilyLength(), family, 0, family.length));
assertTrue("Incorrect value", Bytes.equals(cell.getValueArray(), cell.getValueOffset(),
cell.getValueLength(), value, 0, value.length));
}
}
assertEquals("Should have read back as many KVs as written", total, count);
reader.close();
}
}

View File

@ -0,0 +1,44 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.wal;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.regionserver.wal.CompressionContext;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.experimental.categories.Category;
@Category({RegionServerTests.class, LargeTests.class})
public class TestWALSplitValueCompression extends TestWALSplit {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestWALSplitValueCompression.class);
@BeforeClass
public static void setUpBeforeClass() throws Exception {
TEST_UTIL.getConfiguration()
.setBoolean(HConstants.ENABLE_WAL_COMPRESSION, true);
TEST_UTIL.getConfiguration()
.setBoolean(CompressionContext.ENABLE_WAL_VALUE_COMPRESSION, true);
TestWALSplit.setUpBeforeClass();
}
}