[jira] [HBASE-5469] Add baseline compression efficiency to DataBlockEncodingTool
Summary: DataBlockEncodingTool currently does not provide baseline compression efficiency, e.g. Hadoop compression codec applied to unencoded data. E.g. if we are using LZO to compress blocks, we would like to have the following columns in the report (possibly as percentages of raw data size). Baseline K+V in blockcache | Baseline K + V on disk (LZO compressed) | K + V DataBlockEncoded in block cache | K + V DataBlockEncoded + LZOCompressed (on disk) Background: we never store compressed blocks in cache, but we always store encoded data blocks in cache if data block encoding is enabled for the column family. This patch also has multiple bugfixes and improvements to DataBlockEncodingTool, including presentation format, memory requirements (reduced 3x) and fixing the handling of compression. Test Plan: * Run unit tests. * Run DataBlockEncodingTool on a variety of real-world HFiles. Reviewers: JIRA, dhruba, tedyu, stack, heyongqiang Reviewed By: tedyu Differential Revision: https://reviews.facebook.net/D2409 git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1304626 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
1e6c227883
commit
86962028dd
|
@ -40,9 +40,9 @@ abstract class BufferedDataBlockEncoder implements DataBlockEncoder {
|
|||
private static int INITIAL_KEY_BUFFER_SIZE = 512;
|
||||
|
||||
@Override
|
||||
public ByteBuffer uncompressKeyValues(DataInputStream source,
|
||||
public ByteBuffer decodeKeyValues(DataInputStream source,
|
||||
boolean includesMemstoreTS) throws IOException {
|
||||
return uncompressKeyValues(source, 0, 0, includesMemstoreTS);
|
||||
return decodeKeyValues(source, 0, 0, includesMemstoreTS);
|
||||
}
|
||||
|
||||
protected static class SeekerState {
|
||||
|
@ -329,11 +329,10 @@ abstract class BufferedDataBlockEncoder implements DataBlockEncoder {
|
|||
ByteBuffer in, boolean includesMemstoreTS) throws IOException;
|
||||
|
||||
@Override
|
||||
public void compressKeyValues(ByteBuffer in,
|
||||
public void encodeKeyValues(ByteBuffer in,
|
||||
boolean includesMemstoreTS,
|
||||
HFileBlockEncodingContext blkEncodingCtx) throws IOException {
|
||||
if (!(blkEncodingCtx.getClass().getName().equals(
|
||||
HFileBlockDefaultEncodingContext.class.getName()))) {
|
||||
if (blkEncodingCtx.getClass() != HFileBlockDefaultEncodingContext.class) {
|
||||
throw new IOException (this.getClass().getName() + " only accepts "
|
||||
+ HFileBlockDefaultEncodingContext.class.getName() + " as the " +
|
||||
"encoding context.");
|
||||
|
|
|
@ -41,7 +41,7 @@ public class CopyKeyDataBlockEncoder extends BufferedDataBlockEncoder {
|
|||
}
|
||||
|
||||
@Override
|
||||
public ByteBuffer uncompressKeyValues(DataInputStream source,
|
||||
public ByteBuffer decodeKeyValues(DataInputStream source,
|
||||
int preserveHeaderLength, int skipLastBytes, boolean includesMemstoreTS)
|
||||
throws IOException {
|
||||
int decompressedSize = source.readInt();
|
||||
|
|
|
@ -37,13 +37,13 @@ import org.apache.hadoop.io.RawComparator;
|
|||
*
|
||||
* After encoding, it also optionally compresses the encoded data if a
|
||||
* compression algorithm is specified in HFileBlockEncodingContext argument of
|
||||
* {@link #compressKeyValues(ByteBuffer, boolean, HFileBlockEncodingContext)}.
|
||||
* {@link #encodeKeyValues(ByteBuffer, boolean, HFileBlockEncodingContext)}.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public interface DataBlockEncoder {
|
||||
|
||||
/**
|
||||
* Compress KeyValues. It will first encode key value pairs, and then
|
||||
* Encodes KeyValues. It will first encode key value pairs, and then
|
||||
* optionally do the compression for the encoded data.
|
||||
*
|
||||
* @param in
|
||||
|
@ -57,24 +57,24 @@ public interface DataBlockEncoder {
|
|||
* @throws IOException
|
||||
* If there is an error writing to output stream.
|
||||
*/
|
||||
public void compressKeyValues(
|
||||
public void encodeKeyValues(
|
||||
ByteBuffer in, boolean includesMemstoreTS,
|
||||
HFileBlockEncodingContext encodingContext) throws IOException;
|
||||
|
||||
/**
|
||||
* Uncompress.
|
||||
* Decode.
|
||||
* @param source Compressed stream of KeyValues.
|
||||
* @param includesMemstoreTS true if including memstore timestamp after every
|
||||
* key-value pair
|
||||
* @return Uncompressed block of KeyValues.
|
||||
* @throws IOException If there is an error in source.
|
||||
*/
|
||||
public ByteBuffer uncompressKeyValues(DataInputStream source,
|
||||
public ByteBuffer decodeKeyValues(DataInputStream source,
|
||||
boolean includesMemstoreTS) throws IOException;
|
||||
|
||||
/**
|
||||
* Uncompress.
|
||||
* @param source Compressed stream of KeyValues.
|
||||
* @param source encoded stream of KeyValues.
|
||||
* @param allocateHeaderLength allocate this many bytes for the header.
|
||||
* @param skipLastBytes Do not copy n last bytes.
|
||||
* @param includesMemstoreTS true if including memstore timestamp after every
|
||||
|
@ -82,7 +82,7 @@ public interface DataBlockEncoder {
|
|||
* @return Uncompressed block of KeyValues.
|
||||
* @throws IOException If there is an error in source.
|
||||
*/
|
||||
public ByteBuffer uncompressKeyValues(DataInputStream source,
|
||||
public ByteBuffer decodeKeyValues(DataInputStream source,
|
||||
int allocateHeaderLength, int skipLastBytes, boolean includesMemstoreTS)
|
||||
throws IOException;
|
||||
|
||||
|
|
|
@ -335,7 +335,7 @@ public class DiffKeyDeltaEncoder extends BufferedDataBlockEncoder {
|
|||
}
|
||||
|
||||
@Override
|
||||
public ByteBuffer uncompressKeyValues(DataInputStream source,
|
||||
public ByteBuffer decodeKeyValues(DataInputStream source,
|
||||
int allocHeaderLength, int skipLastBytes, boolean includesMemstoreTS)
|
||||
throws IOException {
|
||||
int decompressedSize = source.readInt();
|
||||
|
|
|
@ -19,7 +19,9 @@ package org.apache.hadoop.hbase.io.encoding;
|
|||
import java.io.ByteArrayInputStream;
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.DataInputStream;
|
||||
import java.io.DataOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.OutputStream;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.Iterator;
|
||||
|
||||
|
@ -27,46 +29,44 @@ import org.apache.commons.lang.NotImplementedException;
|
|||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.KeyValue;
|
||||
import org.apache.hadoop.hbase.io.hfile.Compression;
|
||||
import org.apache.hadoop.hbase.io.hfile.Compression.Algorithm;
|
||||
import org.apache.hadoop.hbase.io.hfile.HFileBlock;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.io.compress.Compressor;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.io.NullOutputStream;
|
||||
|
||||
/**
|
||||
* Encapsulates a data block compressed using a particular encoding algorithm.
|
||||
* Useful for testing and benchmarking.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class EncodedDataBlock {
|
||||
private static final int BUFFER_SIZE = 4 * 1024;
|
||||
protected DataBlockEncoder dataBlockEncoder;
|
||||
ByteArrayOutputStream uncompressedOutputStream;
|
||||
ByteBuffer uncompressedBuffer;
|
||||
private byte[] cacheCompressData;
|
||||
private byte[] rawKVs;
|
||||
private ByteBuffer rawBuffer;
|
||||
private DataBlockEncoder dataBlockEncoder;
|
||||
|
||||
private byte[] cachedEncodedData;
|
||||
private boolean includesMemstoreTS;
|
||||
|
||||
private final HFileBlockEncodingContext encodingCxt;
|
||||
private final HFileBlockEncodingContext encodingCtx;
|
||||
|
||||
/**
|
||||
* Create a buffer which will be encoded using dataBlockEncoder.
|
||||
* @param dataBlockEncoder Algorithm used for compression.
|
||||
* @param encoding encoding type used
|
||||
* @param rawKVs
|
||||
*/
|
||||
public EncodedDataBlock(DataBlockEncoder dataBlockEncoder,
|
||||
boolean includesMemstoreTS, DataBlockEncoding encoding) {
|
||||
boolean includesMemstoreTS, DataBlockEncoding encoding, byte[] rawKVs) {
|
||||
Preconditions.checkNotNull(encoding,
|
||||
"Cannot create encoded data block with null encoder");
|
||||
this.dataBlockEncoder = dataBlockEncoder;
|
||||
uncompressedOutputStream = new ByteArrayOutputStream(BUFFER_SIZE);
|
||||
encodingCxt =
|
||||
encodingCtx =
|
||||
dataBlockEncoder.newDataBlockEncodingContext(Compression.Algorithm.NONE,
|
||||
encoding, HFileBlock.DUMMY_HEADER);
|
||||
}
|
||||
|
||||
/**
|
||||
* Add KeyValue and compress it.
|
||||
* @param kv Item to be added and compressed.
|
||||
*/
|
||||
public void addKv(KeyValue kv) {
|
||||
cacheCompressData = null;
|
||||
uncompressedOutputStream.write(
|
||||
kv.getBuffer(), kv.getOffset(), kv.getLength());
|
||||
this.rawKVs = rawKVs;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -74,19 +74,20 @@ public class EncodedDataBlock {
|
|||
* @return Forwards sequential iterator.
|
||||
*/
|
||||
public Iterator<KeyValue> getIterator() {
|
||||
final int uncompressedSize = uncompressedOutputStream.size();
|
||||
final ByteArrayInputStream bais = new ByteArrayInputStream(
|
||||
getCompressedData());
|
||||
final int rawSize = rawKVs.length;
|
||||
byte[] encodedDataWithHeader = getEncodedData();
|
||||
int bytesToSkip = encodingCtx.getHeaderSize() + Bytes.SIZEOF_SHORT;
|
||||
ByteArrayInputStream bais = new ByteArrayInputStream(encodedDataWithHeader,
|
||||
bytesToSkip, encodedDataWithHeader.length - bytesToSkip);
|
||||
final DataInputStream dis = new DataInputStream(bais);
|
||||
|
||||
|
||||
return new Iterator<KeyValue>() {
|
||||
private ByteBuffer decompressedData = null;
|
||||
|
||||
@Override
|
||||
public boolean hasNext() {
|
||||
if (decompressedData == null) {
|
||||
return uncompressedSize > 0;
|
||||
return rawSize > 0;
|
||||
}
|
||||
return decompressedData.hasRemaining();
|
||||
}
|
||||
|
@ -95,7 +96,7 @@ public class EncodedDataBlock {
|
|||
public KeyValue next() {
|
||||
if (decompressedData == null) {
|
||||
try {
|
||||
decompressedData = dataBlockEncoder.uncompressKeyValues(
|
||||
decompressedData = dataBlockEncoder.decodeKeyValues(
|
||||
dis, includesMemstoreTS);
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException("Problem with data block encoder, " +
|
||||
|
@ -129,99 +130,86 @@ public class EncodedDataBlock {
|
|||
* @return Size in bytes of compressed data.
|
||||
*/
|
||||
public int getSize() {
|
||||
return getCompressedData().length;
|
||||
return getEncodedData().length;
|
||||
}
|
||||
|
||||
/**
|
||||
* Find the size of compressed data assuming that buffer will be compressed
|
||||
* using given algorithm.
|
||||
* @param compressor Algorithm used for compression.
|
||||
* @param buffer Array to be compressed.
|
||||
* @param algo compression algorithm
|
||||
* @param compressor compressor already requested from codec
|
||||
* @param inputBuffer Array to be compressed.
|
||||
* @param offset Offset to beginning of the data.
|
||||
* @param length Length to be compressed.
|
||||
* @return Size of compressed data in bytes.
|
||||
* @throws IOException
|
||||
*/
|
||||
public static int checkCompressedSize(Compressor compressor, byte[] buffer,
|
||||
int offset, int length) {
|
||||
byte[] compressedBuffer = new byte[buffer.length];
|
||||
// in fact the buffer could be of any positive size
|
||||
compressor.setInput(buffer, offset, length);
|
||||
compressor.finish();
|
||||
int currentPos = 0;
|
||||
while (!compressor.finished()) {
|
||||
try {
|
||||
// we don't care about compressed data,
|
||||
// we just want to callculate number of bytes
|
||||
currentPos += compressor.compress(compressedBuffer, 0,
|
||||
compressedBuffer.length);
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException(
|
||||
"For some reason compressor couldn't read data. " +
|
||||
"It is likely a problem with " +
|
||||
compressor.getClass().getName(), e);
|
||||
}
|
||||
public static int getCompressedSize(Algorithm algo, Compressor compressor,
|
||||
byte[] inputBuffer, int offset, int length) throws IOException {
|
||||
DataOutputStream compressedStream = new DataOutputStream(
|
||||
new NullOutputStream());
|
||||
if (compressor != null) {
|
||||
compressor.reset();
|
||||
}
|
||||
return currentPos;
|
||||
OutputStream compressingStream = algo.createCompressionStream(
|
||||
compressedStream, compressor, 0);
|
||||
|
||||
compressingStream.write(inputBuffer, offset, length);
|
||||
compressingStream.flush();
|
||||
compressingStream.close();
|
||||
|
||||
return compressedStream.size();
|
||||
}
|
||||
|
||||
/**
|
||||
* Estimate size after second stage of compression (e.g. LZO).
|
||||
* @param compressor Algorithm which will be used for compressions.
|
||||
* @param comprAlgo compression algorithm to be used for compression
|
||||
* @param compressor compressor corresponding to the given compression
|
||||
* algorithm
|
||||
* @return Size after second stage of compression.
|
||||
*/
|
||||
public int checkCompressedSize(Compressor compressor) {
|
||||
// compress
|
||||
byte[] compressedBytes = getCompressedData();
|
||||
return checkCompressedSize(compressor, compressedBytes, 0,
|
||||
public int getEncodedCompressedSize(Algorithm comprAlgo,
|
||||
Compressor compressor) throws IOException {
|
||||
byte[] compressedBytes = getEncodedData();
|
||||
return getCompressedSize(comprAlgo, compressor, compressedBytes, 0,
|
||||
compressedBytes.length);
|
||||
}
|
||||
|
||||
private byte[] getCompressedData() {
|
||||
// is cached
|
||||
if (cacheCompressData != null) {
|
||||
return cacheCompressData;
|
||||
/** @return encoded data with header */
|
||||
private byte[] getEncodedData() {
|
||||
if (cachedEncodedData != null) {
|
||||
return cachedEncodedData;
|
||||
}
|
||||
cacheCompressData = encodeData();
|
||||
|
||||
return cacheCompressData;
|
||||
cachedEncodedData = encodeData();
|
||||
return cachedEncodedData;
|
||||
}
|
||||
|
||||
private ByteBuffer getUncompressedBuffer() {
|
||||
if (uncompressedBuffer == null ||
|
||||
uncompressedBuffer.limit() < uncompressedOutputStream.size()) {
|
||||
uncompressedBuffer = ByteBuffer.wrap(
|
||||
uncompressedOutputStream.toByteArray());
|
||||
if (rawBuffer == null || rawBuffer.limit() < rawKVs.length) {
|
||||
rawBuffer = ByteBuffer.wrap(rawKVs);
|
||||
}
|
||||
return uncompressedBuffer;
|
||||
return rawBuffer;
|
||||
}
|
||||
|
||||
/**
|
||||
* Do the encoding .
|
||||
* @return encoded byte buffer.
|
||||
* Do the encoding, but do not cache the encoded data.
|
||||
* @return encoded data block with header and checksum
|
||||
*/
|
||||
public byte[] encodeData() {
|
||||
try {
|
||||
this.dataBlockEncoder.compressKeyValues(
|
||||
getUncompressedBuffer(), includesMemstoreTS, encodingCxt);
|
||||
this.dataBlockEncoder.encodeKeyValues(
|
||||
getUncompressedBuffer(), includesMemstoreTS, encodingCtx);
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException(String.format(
|
||||
"Bug in encoding part of algorithm %s. " +
|
||||
"Probably it requested more bytes than are available.",
|
||||
toString()), e);
|
||||
}
|
||||
return encodingCxt.getUncompressedBytesWithHeader();
|
||||
return encodingCtx.getUncompressedBytesWithHeader();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return dataBlockEncoder.toString();
|
||||
}
|
||||
|
||||
/**
|
||||
* Get uncompressed buffer.
|
||||
* @return The buffer.
|
||||
*/
|
||||
public byte[] getRawKeyValues() {
|
||||
return uncompressedOutputStream.toByteArray();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -362,7 +362,7 @@ public class FastDiffDeltaEncoder extends BufferedDataBlockEncoder {
|
|||
}
|
||||
|
||||
@Override
|
||||
public ByteBuffer uncompressKeyValues(DataInputStream source,
|
||||
public ByteBuffer decodeKeyValues(DataInputStream source,
|
||||
int allocHeaderLength, int skipLastBytes, boolean includesMemstoreTS)
|
||||
throws IOException {
|
||||
int decompressedSize = source.readInt();
|
||||
|
|
|
@ -16,10 +16,14 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.io.encoding;
|
||||
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.DataInputStream;
|
||||
import java.io.DataOutputStream;
|
||||
import java.io.FilterOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.lang.reflect.Field;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.Arrays;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.KeyValue;
|
||||
|
@ -91,7 +95,7 @@ public class PrefixKeyDeltaEncoder extends BufferedDataBlockEncoder {
|
|||
}
|
||||
|
||||
@Override
|
||||
public ByteBuffer uncompressKeyValues(DataInputStream source,
|
||||
public ByteBuffer decodeKeyValues(DataInputStream source,
|
||||
int allocHeaderLength, int skipLastBytes, boolean includesMemstoreTS)
|
||||
throws IOException {
|
||||
int decompressedSize = source.readInt();
|
||||
|
@ -101,7 +105,7 @@ public class PrefixKeyDeltaEncoder extends BufferedDataBlockEncoder {
|
|||
int prevKeyOffset = 0;
|
||||
|
||||
while (source.available() > skipLastBytes) {
|
||||
prevKeyOffset = uncompressKeyValue(source, buffer, prevKeyOffset);
|
||||
prevKeyOffset = decodeKeyValue(source, buffer, prevKeyOffset);
|
||||
afterDecodingKeyValue(source, buffer, includesMemstoreTS);
|
||||
}
|
||||
|
||||
|
@ -113,7 +117,7 @@ public class PrefixKeyDeltaEncoder extends BufferedDataBlockEncoder {
|
|||
return buffer;
|
||||
}
|
||||
|
||||
private int uncompressKeyValue(DataInputStream source, ByteBuffer buffer,
|
||||
private int decodeKeyValue(DataInputStream source, ByteBuffer buffer,
|
||||
int prevKeyOffset)
|
||||
throws IOException, EncoderBufferTooSmallException {
|
||||
int keyLength = ByteBufferUtils.readCompressedInt(source);
|
||||
|
|
|
@ -33,13 +33,12 @@ import org.apache.hadoop.classification.InterfaceAudience;
|
|||
import org.apache.hadoop.fs.FSDataInputStream;
|
||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.fs.HFileSystem;
|
||||
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
|
||||
import org.apache.hadoop.hbase.io.encoding.HFileBlockDecodingContext;
|
||||
import org.apache.hadoop.hbase.io.encoding.HFileBlockDefaultDecodingContext;
|
||||
import org.apache.hadoop.hbase.io.encoding.HFileBlockDefaultEncodingContext;
|
||||
import org.apache.hadoop.hbase.io.encoding.HFileBlockEncodingContext;
|
||||
import org.apache.hadoop.hbase.io.encoding.HFileBlockDecodingContext;
|
||||
import org.apache.hadoop.hbase.io.hfile.Compression.Algorithm;
|
||||
import org.apache.hadoop.hbase.regionserver.MemStore;
|
||||
import org.apache.hadoop.hbase.regionserver.metrics.SchemaConfigured;
|
||||
|
@ -123,6 +122,7 @@ public class HFileBlock extends SchemaConfigured implements Cacheable {
|
|||
|
||||
/** Just an array of bytes of the right size. */
|
||||
public static final byte[] DUMMY_HEADER = new byte[HEADER_SIZE];
|
||||
|
||||
static final byte[] DUMMY_HEADER_NO_CHECKSUM =
|
||||
new byte[HEADER_SIZE_NO_CHECKSUM];
|
||||
|
||||
|
@ -835,10 +835,10 @@ public class HFileBlock extends SchemaConfigured implements Cacheable {
|
|||
putHeader(onDiskBytesWithHeader, 0,
|
||||
onDiskBytesWithHeader.length + numBytes,
|
||||
uncompressedBytesWithHeader.length, onDiskBytesWithHeader.length);
|
||||
//set the header for the uncompressed bytes (for cache-on-write)
|
||||
// set the header for the uncompressed bytes (for cache-on-write)
|
||||
putHeader(uncompressedBytesWithHeader, 0,
|
||||
onDiskBytesWithHeader.length + numBytes,
|
||||
uncompressedBytesWithHeader.length, onDiskBytesWithHeader.length);
|
||||
uncompressedBytesWithHeader.length, onDiskBytesWithHeader.length);
|
||||
|
||||
onDiskChecksum = new byte[numBytes];
|
||||
ChecksumUtil.generateChecksums(
|
||||
|
|
|
@ -221,7 +221,7 @@ public class HFileDataBlockEncoderImpl implements HFileDataBlockEncoder {
|
|||
HFileBlockEncodingContext encodeCtx) {
|
||||
DataBlockEncoder encoder = algo.getEncoder();
|
||||
try {
|
||||
encoder.compressKeyValues(in, includesMemstoreTS, encodeCtx);
|
||||
encoder.encodeKeyValues(in, includesMemstoreTS, encodeCtx);
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException(String.format(
|
||||
"Bug in data block encoder "
|
||||
|
|
|
@ -85,7 +85,7 @@ public class TestDataBlockEncoders {
|
|||
HFileBlockEncodingContext encodingCtx =
|
||||
getEncodingContext(Compression.Algorithm.NONE, encoding);
|
||||
|
||||
encoder.compressKeyValues(dataset, includesMemstoreTS,
|
||||
encoder.encodeKeyValues(dataset, includesMemstoreTS,
|
||||
encodingCtx);
|
||||
|
||||
byte[] encodedBytesWithHeader =
|
||||
|
@ -106,7 +106,7 @@ public class TestDataBlockEncoders {
|
|||
DataInputStream dis = new DataInputStream(bais);
|
||||
ByteBuffer actualDataset;
|
||||
DataBlockEncoder encoder = encoding.getEncoder();
|
||||
actualDataset = encoder.uncompressKeyValues(dis, includesMemstoreTS);
|
||||
actualDataset = encoder.decodeKeyValues(dis, includesMemstoreTS);
|
||||
|
||||
dataset.rewind();
|
||||
actualDataset.rewind();
|
||||
|
|
|
@ -435,7 +435,7 @@ public class TestHFileBlock {
|
|||
if (encoder != null) {
|
||||
HFileBlockEncodingContext encodingCtx =
|
||||
encoder.newDataBlockEncodingContext(algo, encoding, dummyHeader);
|
||||
encoder.compressKeyValues(rawBuf, includesMemstoreTS,
|
||||
encoder.encodeKeyValues(rawBuf, includesMemstoreTS,
|
||||
encodingCtx);
|
||||
encodedResultWithHeader =
|
||||
encodingCtx.getUncompressedBytesWithHeader();
|
||||
|
|
|
@ -20,7 +20,7 @@ import java.io.ByteArrayInputStream;
|
|||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.OutputStream;
|
||||
import java.text.DecimalFormat;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
|
@ -38,14 +38,15 @@ import org.apache.hadoop.fs.FileSystem;
|
|||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||
import org.apache.hadoop.hbase.KeyValue;
|
||||
import org.apache.hadoop.hbase.io.encoding.EncodedDataBlock;
|
||||
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoder;
|
||||
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
|
||||
import org.apache.hadoop.hbase.io.encoding.EncodedDataBlock;
|
||||
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
|
||||
import org.apache.hadoop.hbase.io.hfile.Compression;
|
||||
import org.apache.hadoop.hbase.io.hfile.NoOpDataBlockEncoder;
|
||||
import org.apache.hadoop.hbase.io.hfile.Compression.Algorithm;
|
||||
import org.apache.hadoop.hbase.io.hfile.NoOpDataBlockEncoder;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.io.compress.CompressionOutputStream;
|
||||
import org.apache.hadoop.io.compress.Compressor;
|
||||
import org.apache.hadoop.io.compress.Decompressor;
|
||||
|
||||
|
@ -60,33 +61,85 @@ public class DataBlockEncodingTool {
|
|||
private static final boolean includesMemstoreTS = true;
|
||||
|
||||
/**
|
||||
* How many times should benchmark run.
|
||||
* More times means better data in terms of statistics.
|
||||
* It has to be larger than BENCHMARK_N_OMIT.
|
||||
* How many times to run the benchmark. More times means better data in terms
|
||||
* of statistics but slower execution. Has to be strictly larger than
|
||||
* {@link DEFAULT_BENCHMARK_N_OMIT}.
|
||||
*/
|
||||
public static int BENCHMARK_N_TIMES = 12;
|
||||
private static final int DEFAULT_BENCHMARK_N_TIMES = 12;
|
||||
|
||||
/**
|
||||
* How many first runs should omit benchmark.
|
||||
* Usually it is one in order to exclude setup cost.
|
||||
* Has to be 0 or larger.
|
||||
* How many first runs should not be included in the benchmark. Done in order
|
||||
* to exclude setup cost.
|
||||
*/
|
||||
public static int BENCHMARK_N_OMIT = 2;
|
||||
private static final int DEFAULT_BENCHMARK_N_OMIT = 2;
|
||||
|
||||
/** HFile name to be used in benchmark */
|
||||
private static final String OPT_HFILE_NAME = "f";
|
||||
|
||||
/** Maximum number of key/value pairs to process in a single benchmark run */
|
||||
private static final String OPT_KV_LIMIT = "n";
|
||||
|
||||
/** Whether to run a benchmark to measure read throughput */
|
||||
private static final String OPT_MEASURE_THROUGHPUT = "b";
|
||||
|
||||
/** If this is specified, no correctness testing will be done */
|
||||
private static final String OPT_OMIT_CORRECTNESS_TEST = "c";
|
||||
|
||||
/** What encoding algorithm to test */
|
||||
private static final String OPT_ENCODING_ALGORITHM = "a";
|
||||
|
||||
/** Number of times to run each benchmark */
|
||||
private static final String OPT_BENCHMARK_N_TIMES = "t";
|
||||
|
||||
/** Number of first runs of every benchmark to omit from statistics */
|
||||
private static final String OPT_BENCHMARK_N_OMIT = "omit";
|
||||
|
||||
/** Compression algorithm to use if not specified on the command line */
|
||||
private static final Algorithm DEFAULT_COMPRESSION =
|
||||
Compression.Algorithm.GZ;
|
||||
|
||||
private List<EncodedDataBlock> codecs = new ArrayList<EncodedDataBlock>();
|
||||
private int totalPrefixLength = 0;
|
||||
private int totalKeyLength = 0;
|
||||
private int totalValueLength = 0;
|
||||
private int totalKeyRedundancyLength = 0;
|
||||
private static final DecimalFormat DELIMITED_DECIMAL_FORMAT =
|
||||
new DecimalFormat();
|
||||
|
||||
final private String compressionAlgorithmName;
|
||||
final private Algorithm compressionAlgorithm;
|
||||
final private Compressor compressor;
|
||||
final private Decompressor decompressor;
|
||||
static {
|
||||
DELIMITED_DECIMAL_FORMAT.setGroupingSize(3);
|
||||
}
|
||||
|
||||
private static final String PCT_FORMAT = "%.2f %%";
|
||||
private static final String INT_FORMAT = "%d";
|
||||
|
||||
private static int benchmarkNTimes = DEFAULT_BENCHMARK_N_TIMES;
|
||||
private static int benchmarkNOmit = DEFAULT_BENCHMARK_N_OMIT;
|
||||
|
||||
private List<EncodedDataBlock> codecs = new ArrayList<EncodedDataBlock>();
|
||||
private long totalPrefixLength = 0;
|
||||
private long totalKeyLength = 0;
|
||||
private long totalValueLength = 0;
|
||||
private long totalKeyRedundancyLength = 0;
|
||||
private long totalCFLength = 0;
|
||||
|
||||
private byte[] rawKVs;
|
||||
|
||||
private final String compressionAlgorithmName;
|
||||
private final Algorithm compressionAlgorithm;
|
||||
private final Compressor compressor;
|
||||
private final Decompressor decompressor;
|
||||
|
||||
private static enum Manipulation {
|
||||
ENCODING,
|
||||
DECODING,
|
||||
COMPRESSION,
|
||||
DECOMPRESSION;
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
String s = super.toString();
|
||||
StringBuilder sb = new StringBuilder();
|
||||
sb.append(s.charAt(0));
|
||||
sb.append(s.substring(1).toLowerCase());
|
||||
return sb.toString();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @param compressionAlgorithmName What kind of algorithm should be used
|
||||
|
@ -110,22 +163,21 @@ public class DataBlockEncodingTool {
|
|||
throws IOException {
|
||||
scanner.seek(KeyValue.LOWESTKEY);
|
||||
|
||||
KeyValue currentKv;
|
||||
KeyValue currentKV;
|
||||
|
||||
byte[] previousKey = null;
|
||||
byte[] currentKey;
|
||||
|
||||
DataBlockEncoding[] encodings = DataBlockEncoding.values();
|
||||
for(DataBlockEncoding encoding : encodings) {
|
||||
DataBlockEncoder d = encoding.getEncoder();
|
||||
codecs.add(new EncodedDataBlock(d, includesMemstoreTS, encoding));
|
||||
}
|
||||
|
||||
ByteArrayOutputStream uncompressedOutputStream =
|
||||
new ByteArrayOutputStream();
|
||||
|
||||
int j = 0;
|
||||
while ((currentKv = scanner.next()) != null && j < kvLimit) {
|
||||
while ((currentKV = scanner.next()) != null && j < kvLimit) {
|
||||
// Iterates through key/value pairs
|
||||
j++;
|
||||
currentKey = currentKv.getKey();
|
||||
currentKey = currentKV.getKey();
|
||||
if (previousKey != null) {
|
||||
for (int i = 0; i < previousKey.length && i < currentKey.length &&
|
||||
previousKey[i] == currentKey[i]; ++i) {
|
||||
|
@ -133,22 +185,36 @@ public class DataBlockEncodingTool {
|
|||
}
|
||||
}
|
||||
|
||||
for (EncodedDataBlock codec : codecs) {
|
||||
codec.addKv(currentKv);
|
||||
}
|
||||
uncompressedOutputStream.write(currentKV.getBuffer(),
|
||||
currentKV.getOffset(), currentKV.getLength());
|
||||
|
||||
previousKey = currentKey;
|
||||
|
||||
totalPrefixLength += currentKv.getLength() - currentKv.getKeyLength() -
|
||||
currentKv.getValueLength();
|
||||
totalKeyLength += currentKv.getKeyLength();
|
||||
totalValueLength += currentKv.getValueLength();
|
||||
int kLen = currentKV.getKeyLength();
|
||||
int vLen = currentKV.getValueLength();
|
||||
int cfLen = currentKV.getFamilyLength(currentKV.getFamilyOffset());
|
||||
int restLen = currentKV.getLength() - kLen - vLen;
|
||||
|
||||
totalKeyLength += kLen;
|
||||
totalValueLength += vLen;
|
||||
totalPrefixLength += restLen;
|
||||
totalCFLength += cfLen;
|
||||
}
|
||||
|
||||
rawKVs = uncompressedOutputStream.toByteArray();
|
||||
|
||||
for (DataBlockEncoding encoding : encodings) {
|
||||
if (encoding == DataBlockEncoding.NONE) {
|
||||
continue;
|
||||
}
|
||||
DataBlockEncoder d = encoding.getEncoder();
|
||||
codecs.add(new EncodedDataBlock(d, includesMemstoreTS, encoding, rawKVs));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Verify if all data block encoders are working properly.
|
||||
*
|
||||
*
|
||||
* @param scanner Of file which was compressed.
|
||||
* @param kvLimit Maximal count of KeyValue which will be processed.
|
||||
* @return true if all data block encoders compressed/decompressed correctly.
|
||||
|
@ -226,15 +292,14 @@ public class DataBlockEncodingTool {
|
|||
/**
|
||||
* Benchmark codec's speed.
|
||||
*/
|
||||
public void benchmarkCodecs() {
|
||||
public void benchmarkCodecs() throws IOException {
|
||||
LOG.info("Starting a throughput benchmark for data block encoding codecs");
|
||||
int prevTotalSize = -1;
|
||||
for (EncodedDataBlock codec : codecs) {
|
||||
prevTotalSize = benchmarkEncoder(prevTotalSize, codec);
|
||||
}
|
||||
|
||||
byte[] buffer = codecs.get(0).getRawKeyValues();
|
||||
|
||||
benchmarkDefaultCompression(prevTotalSize, buffer);
|
||||
benchmarkDefaultCompression(prevTotalSize, rawKVs);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -250,7 +315,7 @@ public class DataBlockEncodingTool {
|
|||
|
||||
// decompression time
|
||||
List<Long> durations = new ArrayList<Long>();
|
||||
for (int itTime = 0; itTime < BENCHMARK_N_TIMES; ++itTime) {
|
||||
for (int itTime = 0; itTime < benchmarkNTimes; ++itTime) {
|
||||
totalSize = 0;
|
||||
|
||||
Iterator<KeyValue> it;
|
||||
|
@ -264,7 +329,7 @@ public class DataBlockEncodingTool {
|
|||
totalSize += it.next().getLength();
|
||||
}
|
||||
final long finishTime = System.nanoTime();
|
||||
if (itTime >= BENCHMARK_N_OMIT) {
|
||||
if (itTime >= benchmarkNOmit) {
|
||||
durations.add(finishTime - startTime);
|
||||
}
|
||||
|
||||
|
@ -275,26 +340,27 @@ public class DataBlockEncodingTool {
|
|||
prevTotalSize = totalSize;
|
||||
}
|
||||
|
||||
// compression time
|
||||
List<Long> compressDurations = new ArrayList<Long>();
|
||||
for (int itTime = 0; itTime < BENCHMARK_N_TIMES; ++itTime) {
|
||||
List<Long> encodingDurations = new ArrayList<Long>();
|
||||
for (int itTime = 0; itTime < benchmarkNTimes; ++itTime) {
|
||||
final long startTime = System.nanoTime();
|
||||
codec.encodeData();
|
||||
final long finishTime = System.nanoTime();
|
||||
if (itTime >= BENCHMARK_N_OMIT) {
|
||||
compressDurations.add(finishTime - startTime);
|
||||
if (itTime >= benchmarkNOmit) {
|
||||
encodingDurations.add(finishTime - startTime);
|
||||
}
|
||||
}
|
||||
|
||||
System.out.println(codec.toString() + ":");
|
||||
printBenchmarkResult(totalSize, compressDurations, false);
|
||||
printBenchmarkResult(totalSize, durations, true);
|
||||
printBenchmarkResult(totalSize, encodingDurations, Manipulation.ENCODING);
|
||||
printBenchmarkResult(totalSize, durations, Manipulation.DECODING);
|
||||
System.out.println();
|
||||
|
||||
return prevTotalSize;
|
||||
}
|
||||
|
||||
private void benchmarkDefaultCompression(int totalSize, byte[] rawBuffer) {
|
||||
benchmarkAlgorithm(compressionAlgorithm, compressor, decompressor,
|
||||
private void benchmarkDefaultCompression(int totalSize, byte[] rawBuffer)
|
||||
throws IOException {
|
||||
benchmarkAlgorithm(compressionAlgorithm,
|
||||
compressionAlgorithmName.toUpperCase(), rawBuffer, 0, totalSize);
|
||||
}
|
||||
|
||||
|
@ -307,24 +373,22 @@ public class DataBlockEncodingTool {
|
|||
* @param buffer Buffer to be compressed.
|
||||
* @param offset Position of the beginning of the data.
|
||||
* @param length Length of data in buffer.
|
||||
* @throws IOException
|
||||
*/
|
||||
public static void benchmarkAlgorithm(
|
||||
Compression.Algorithm algorithm,
|
||||
Compressor compressorCodec,
|
||||
Decompressor decompressorCodec,
|
||||
String name,
|
||||
byte[] buffer, int offset, int length) {
|
||||
public void benchmarkAlgorithm(Compression.Algorithm algorithm, String name,
|
||||
byte[] buffer, int offset, int length) throws IOException {
|
||||
System.out.println(name + ":");
|
||||
|
||||
// compress it
|
||||
List<Long> compressDurations = new ArrayList<Long>();
|
||||
ByteArrayOutputStream compressedStream = new ByteArrayOutputStream();
|
||||
OutputStream compressingStream;
|
||||
CompressionOutputStream compressingStream =
|
||||
algorithm.createPlainCompressionStream(compressedStream, compressor);
|
||||
try {
|
||||
for (int itTime = 0; itTime < BENCHMARK_N_TIMES; ++itTime) {
|
||||
for (int itTime = 0; itTime < benchmarkNTimes; ++itTime) {
|
||||
final long startTime = System.nanoTime();
|
||||
compressingStream = algorithm.createCompressionStream(
|
||||
compressedStream, compressorCodec, 0);
|
||||
compressingStream.resetState();
|
||||
compressedStream.reset();
|
||||
compressingStream.write(buffer, offset, length);
|
||||
compressingStream.flush();
|
||||
compressedStream.toByteArray();
|
||||
|
@ -332,36 +396,31 @@ public class DataBlockEncodingTool {
|
|||
final long finishTime = System.nanoTime();
|
||||
|
||||
// add time record
|
||||
if (itTime >= BENCHMARK_N_OMIT) {
|
||||
if (itTime >= benchmarkNOmit) {
|
||||
compressDurations.add(finishTime - startTime);
|
||||
}
|
||||
|
||||
if (itTime + 1 < BENCHMARK_N_TIMES) { // not the last one
|
||||
compressedStream.reset();
|
||||
}
|
||||
}
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException(String.format(
|
||||
"Benchmark, or encoding algorithm '%s' cause some stream problems",
|
||||
name), e);
|
||||
}
|
||||
printBenchmarkResult(length, compressDurations, false);
|
||||
|
||||
compressingStream.close();
|
||||
printBenchmarkResult(length, compressDurations, Manipulation.COMPRESSION);
|
||||
|
||||
byte[] compBuffer = compressedStream.toByteArray();
|
||||
|
||||
// uncompress it several times and measure performance
|
||||
List<Long> durations = new ArrayList<Long>();
|
||||
for (int itTime = 0; itTime < BENCHMARK_N_TIMES; ++itTime) {
|
||||
for (int itTime = 0; itTime < benchmarkNTimes; ++itTime) {
|
||||
final long startTime = System.nanoTime();
|
||||
byte[] newBuf = new byte[length + 1];
|
||||
|
||||
try {
|
||||
|
||||
ByteArrayInputStream downStream = new ByteArrayInputStream(compBuffer,
|
||||
0, compBuffer.length);
|
||||
InputStream decompressedStream = algorithm.createDecompressionStream(
|
||||
downStream, decompressorCodec, 0);
|
||||
downStream, decompressor, 0);
|
||||
|
||||
int destOffset = 0;
|
||||
int nextChunk;
|
||||
|
@ -370,7 +429,7 @@ public class DataBlockEncodingTool {
|
|||
}
|
||||
decompressedStream.close();
|
||||
|
||||
// iterate over KeyValue
|
||||
// iterate over KeyValues
|
||||
KeyValue kv;
|
||||
for (int pos = 0; pos < length; pos += kv.getLength()) {
|
||||
kv = new KeyValue(newBuf, pos);
|
||||
|
@ -396,91 +455,116 @@ public class DataBlockEncodingTool {
|
|||
}
|
||||
|
||||
// add time record
|
||||
if (itTime >= BENCHMARK_N_OMIT) {
|
||||
if (itTime >= benchmarkNOmit) {
|
||||
durations.add(finishTime - startTime);
|
||||
}
|
||||
}
|
||||
printBenchmarkResult(length, durations, true);
|
||||
printBenchmarkResult(length, durations, Manipulation.DECOMPRESSION);
|
||||
System.out.println();
|
||||
}
|
||||
|
||||
private static final double BYTES_IN_MB = 1024 * 1024.0;
|
||||
private static final double NS_IN_SEC = 1000.0 * 1000.0 * 1000.0;
|
||||
private static final double MB_SEC_COEF = NS_IN_SEC / BYTES_IN_MB;
|
||||
|
||||
private static void printBenchmarkResult(int totalSize,
|
||||
List<Long> durationsInNanoSed, boolean isDecompression) {
|
||||
List<Long> durationsInNanoSec, Manipulation manipulation) {
|
||||
final int n = durationsInNanoSec.size();
|
||||
long meanTime = 0;
|
||||
for (long time : durationsInNanoSed) {
|
||||
for (long time : durationsInNanoSec) {
|
||||
meanTime += time;
|
||||
}
|
||||
meanTime /= durationsInNanoSed.size();
|
||||
meanTime /= n;
|
||||
|
||||
long standardDev = 0;
|
||||
for (long time : durationsInNanoSed) {
|
||||
standardDev += (time - meanTime) * (time - meanTime);
|
||||
double meanMBPerSec = totalSize * MB_SEC_COEF / meanTime;
|
||||
double mbPerSecSTD = 0;
|
||||
if (n > 0) {
|
||||
for (long time : durationsInNanoSec) {
|
||||
double mbPerSec = totalSize * MB_SEC_COEF / time;
|
||||
double dev = mbPerSec - meanMBPerSec;
|
||||
mbPerSecSTD += dev * dev;
|
||||
}
|
||||
mbPerSecSTD = Math.sqrt(mbPerSecSTD / n);
|
||||
}
|
||||
standardDev = (long) Math.sqrt(standardDev / durationsInNanoSed.size());
|
||||
|
||||
final double million = 1000.0 * 1000.0 * 1000.0;
|
||||
double mbPerSec = (totalSize * million) / (1024.0 * 1024.0 * meanTime);
|
||||
double mbPerSecDev = (totalSize * million) /
|
||||
(1024.0 * 1024.0 * (meanTime - standardDev));
|
||||
outputTuple(manipulation + " performance", "%6.2f MB/s (+/- %.2f MB/s)",
|
||||
meanMBPerSec, mbPerSecSTD);
|
||||
}
|
||||
|
||||
System.out.println(String.format(
|
||||
" %s performance:%s %6.2f MB/s (+/- %.2f MB/s)",
|
||||
isDecompression ? "Decompression" : "Compression",
|
||||
isDecompression ? "" : " ",
|
||||
mbPerSec, mbPerSecDev - mbPerSec));
|
||||
private static void outputTuple(String caption, String format,
|
||||
Object... values) {
|
||||
if (format.startsWith(INT_FORMAT)) {
|
||||
format = "%s" + format.substring(INT_FORMAT.length());
|
||||
values[0] = DELIMITED_DECIMAL_FORMAT.format(values[0]);
|
||||
}
|
||||
|
||||
StringBuilder sb = new StringBuilder();
|
||||
sb.append(" ");
|
||||
sb.append(caption);
|
||||
sb.append(":");
|
||||
|
||||
String v = String.format(format, values);
|
||||
int padding = 60 - sb.length() - v.length();
|
||||
for (int i = 0; i < padding; ++i) {
|
||||
sb.append(' ');
|
||||
}
|
||||
sb.append(v);
|
||||
System.out.println(sb);
|
||||
}
|
||||
|
||||
/**
|
||||
* Display statistics of different compression algorithms.
|
||||
* @throws IOException
|
||||
*/
|
||||
public void displayStatistics() {
|
||||
int totalLength = totalPrefixLength + totalKeyLength + totalValueLength;
|
||||
if (compressor != null) { // might be null e.g. for pure-Java GZIP
|
||||
compressor.reset();
|
||||
}
|
||||
public void displayStatistics() throws IOException {
|
||||
final String comprAlgo = compressionAlgorithmName.toUpperCase();
|
||||
long rawBytes = totalKeyLength + totalPrefixLength + totalValueLength;
|
||||
|
||||
for(EncodedDataBlock codec : codecs) {
|
||||
System.out.println("Raw data size:");
|
||||
outputTuple("Raw bytes", INT_FORMAT, rawBytes);
|
||||
outputTuplePct("Key bytes", totalKeyLength);
|
||||
outputTuplePct("Value bytes", totalValueLength);
|
||||
outputTuplePct("KV infrastructure", totalPrefixLength);
|
||||
outputTuplePct("CF overhead", totalCFLength);
|
||||
outputTuplePct("Total key redundancy", totalKeyRedundancyLength);
|
||||
|
||||
int compressedSize = EncodedDataBlock.getCompressedSize(
|
||||
compressionAlgorithm, compressor, rawKVs, 0, rawKVs.length);
|
||||
outputTuple(comprAlgo + " only size", INT_FORMAT,
|
||||
compressedSize);
|
||||
outputSavings(comprAlgo + " only", compressedSize, rawBytes);
|
||||
System.out.println();
|
||||
|
||||
for (EncodedDataBlock codec : codecs) {
|
||||
System.out.println(codec.toString());
|
||||
int saved = totalKeyLength + totalPrefixLength + totalValueLength
|
||||
- codec.getSize();
|
||||
System.out.println(
|
||||
String.format(" Saved bytes: %8d", saved));
|
||||
double keyRatio = (saved * 100.0) / (totalPrefixLength + totalKeyLength);
|
||||
double allRatio = (saved * 100.0) / totalLength;
|
||||
System.out.println(
|
||||
String.format(" Key compression ratio: %.2f %%", keyRatio));
|
||||
System.out.println(
|
||||
String.format(" All compression ratio: %.2f %%", allRatio));
|
||||
long encodedBytes = codec.getSize();
|
||||
outputTuple("Encoded bytes", INT_FORMAT, encodedBytes);
|
||||
outputSavings("Key encoding", encodedBytes - totalValueLength,
|
||||
rawBytes - totalValueLength);
|
||||
outputSavings("Total encoding", encodedBytes, rawBytes);
|
||||
|
||||
String compressedSizeCaption =
|
||||
String.format(" %s compressed size: ",
|
||||
compressionAlgorithmName.toUpperCase());
|
||||
String compressOnlyRatioCaption =
|
||||
String.format(" %s compression ratio: ",
|
||||
compressionAlgorithmName.toUpperCase());
|
||||
int encodedCompressedSize = codec.getEncodedCompressedSize(
|
||||
compressionAlgorithm, compressor);
|
||||
outputTuple("Encoding + " + comprAlgo + " size", INT_FORMAT,
|
||||
encodedCompressedSize);
|
||||
outputSavings("Encoding + " + comprAlgo, encodedCompressedSize, rawBytes);
|
||||
outputSavings("Encoding with " + comprAlgo, encodedCompressedSize,
|
||||
compressedSize);
|
||||
|
||||
if (compressor != null) {
|
||||
int compressedSize = codec.checkCompressedSize(compressor);
|
||||
System.out.println(compressedSizeCaption +
|
||||
String.format("%8d", compressedSize));
|
||||
double compressOnlyRatio =
|
||||
100.0 * (1.0 - compressedSize / (0.0 + totalLength));
|
||||
System.out.println(compressOnlyRatioCaption
|
||||
+ String.format("%.2f %%", compressOnlyRatio));
|
||||
} else {
|
||||
System.out.println(compressedSizeCaption + "N/A");
|
||||
System.out.println(compressOnlyRatioCaption + "N/A");
|
||||
}
|
||||
System.out.println();
|
||||
}
|
||||
}
|
||||
|
||||
System.out.println(
|
||||
String.format("Total KV prefix length: %8d", totalPrefixLength));
|
||||
System.out.println(
|
||||
String.format("Total key length: %8d", totalKeyLength));
|
||||
System.out.println(
|
||||
String.format("Total key redundancy: %8d",
|
||||
totalKeyRedundancyLength));
|
||||
System.out.println(
|
||||
String.format("Total value length: %8d", totalValueLength));
|
||||
private void outputTuplePct(String caption, long size) {
|
||||
outputTuple(caption, INT_FORMAT + " (" + PCT_FORMAT + ")",
|
||||
size, size * 100.0 / rawKVs.length);
|
||||
}
|
||||
|
||||
private void outputSavings(String caption, long part, long whole) {
|
||||
double pct = 100.0 * (1 - 1.0 * part / whole);
|
||||
double times = whole * 1.0 / part;
|
||||
outputTuple(caption + " savings", PCT_FORMAT + " (%.2f x)",
|
||||
pct, times);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -540,22 +624,35 @@ public class DataBlockEncodingTool {
|
|||
}
|
||||
|
||||
/**
|
||||
* A command line interface to benchmarks.
|
||||
* A command line interface to benchmarks. Parses command-line arguments and
|
||||
* runs the appropriate benchmarks.
|
||||
* @param args Should have length at least 1 and holds the file path to HFile.
|
||||
* @throws IOException If you specified the wrong file.
|
||||
*/
|
||||
public static void main(final String[] args) throws IOException {
|
||||
// set up user arguments
|
||||
Options options = new Options();
|
||||
options.addOption("f", true, "HFile to analyse (REQUIRED)");
|
||||
options.getOption("f").setArgName("FILENAME");
|
||||
options.addOption("n", true,
|
||||
"Limit number of KeyValue which will be analysed");
|
||||
options.getOption("n").setArgName("NUMBER");
|
||||
options.addOption("b", false, "Measure read throughput");
|
||||
options.addOption("c", false, "Omit corectness tests.");
|
||||
options.addOption("a", true,
|
||||
options.addOption(OPT_HFILE_NAME, true, "HFile to analyse (REQUIRED)");
|
||||
options.getOption(OPT_HFILE_NAME).setArgName("FILENAME");
|
||||
options.addOption(OPT_KV_LIMIT, true,
|
||||
"Maximum number of KeyValues to process. A benchmark stops running " +
|
||||
"after iterating over this many KV pairs.");
|
||||
options.getOption(OPT_KV_LIMIT).setArgName("NUMBER");
|
||||
options.addOption(OPT_MEASURE_THROUGHPUT, false,
|
||||
"Measure read throughput");
|
||||
options.addOption(OPT_OMIT_CORRECTNESS_TEST, false,
|
||||
"Omit corectness tests.");
|
||||
options.addOption(OPT_ENCODING_ALGORITHM, true,
|
||||
"What kind of compression algorithm use for comparison.");
|
||||
options.addOption(OPT_BENCHMARK_N_TIMES,
|
||||
true, "Number of times to run each benchmark. Default value: " +
|
||||
DEFAULT_BENCHMARK_N_TIMES);
|
||||
options.addOption(OPT_BENCHMARK_N_OMIT, true,
|
||||
"Number of first runs of every benchmark to exclude from "
|
||||
+ "statistics (" + DEFAULT_BENCHMARK_N_OMIT
|
||||
+ " by default, so that " + "only the last "
|
||||
+ (DEFAULT_BENCHMARK_N_TIMES - DEFAULT_BENCHMARK_N_OMIT)
|
||||
+ " times are included in statistics.)");
|
||||
|
||||
// parse arguments
|
||||
CommandLineParser parser = new PosixParser();
|
||||
|
@ -569,24 +666,44 @@ public class DataBlockEncodingTool {
|
|||
}
|
||||
|
||||
int kvLimit = Integer.MAX_VALUE;
|
||||
if (cmd.hasOption("n")) {
|
||||
kvLimit = Integer.parseInt(cmd.getOptionValue("n"));
|
||||
if (cmd.hasOption(OPT_KV_LIMIT)) {
|
||||
kvLimit = Integer.parseInt(cmd.getOptionValue(OPT_KV_LIMIT));
|
||||
}
|
||||
|
||||
// basic argument sanity checks
|
||||
if (!cmd.hasOption("f")) {
|
||||
System.err.println("ERROR: Filename is required!");
|
||||
if (!cmd.hasOption(OPT_HFILE_NAME)) {
|
||||
LOG.error("Please specify HFile name using the " + OPT_HFILE_NAME
|
||||
+ " option");
|
||||
printUsage(options);
|
||||
System.exit(-1);
|
||||
}
|
||||
|
||||
String pathName = cmd.getOptionValue("f");
|
||||
String pathName = cmd.getOptionValue(OPT_HFILE_NAME);
|
||||
String compressionName = DEFAULT_COMPRESSION.getName();
|
||||
if (cmd.hasOption("a")) {
|
||||
compressionName = cmd.getOptionValue("a").toLowerCase();
|
||||
if (cmd.hasOption(OPT_ENCODING_ALGORITHM)) {
|
||||
compressionName =
|
||||
cmd.getOptionValue(OPT_ENCODING_ALGORITHM).toLowerCase();
|
||||
}
|
||||
boolean doBenchmark = cmd.hasOption("b");
|
||||
boolean doVerify = !cmd.hasOption("c");
|
||||
boolean doBenchmark = cmd.hasOption(OPT_MEASURE_THROUGHPUT);
|
||||
boolean doVerify = !cmd.hasOption(OPT_OMIT_CORRECTNESS_TEST);
|
||||
|
||||
if (cmd.hasOption(OPT_BENCHMARK_N_TIMES)) {
|
||||
benchmarkNTimes = Integer.valueOf(cmd.getOptionValue(
|
||||
OPT_BENCHMARK_N_TIMES));
|
||||
}
|
||||
if (cmd.hasOption(OPT_BENCHMARK_N_OMIT)) {
|
||||
benchmarkNOmit =
|
||||
Integer.valueOf(cmd.getOptionValue(OPT_BENCHMARK_N_OMIT));
|
||||
}
|
||||
if (benchmarkNTimes < benchmarkNOmit) {
|
||||
LOG.error("The number of times to run each benchmark ("
|
||||
+ benchmarkNTimes
|
||||
+ ") must be greater than the number of benchmark runs to exclude "
|
||||
+ "from statistics (" + benchmarkNOmit + ")");
|
||||
System.exit(1);
|
||||
}
|
||||
LOG.info("Running benchmark " + benchmarkNTimes + " times. " +
|
||||
"Excluding the first " + benchmarkNOmit + " times from statistics.");
|
||||
|
||||
final Configuration conf = HBaseConfiguration.create();
|
||||
try {
|
||||
|
|
|
@ -173,7 +173,6 @@ public class EncodedSeekPerformanceTest {
|
|||
List<HFileDataBlockEncoder> encoders =
|
||||
new ArrayList<HFileDataBlockEncoder>();
|
||||
|
||||
encoders.add(new HFileDataBlockEncoderImpl(DataBlockEncoding.NONE));
|
||||
for (DataBlockEncoding encodingAlgo : DataBlockEncoding.values()) {
|
||||
encoders.add(new HFileDataBlockEncoderImpl(DataBlockEncoding.NONE,
|
||||
encodingAlgo));
|
||||
|
|
Loading…
Reference in New Issue