HBASE-26316 Per-table or per-CF compression codec setting overrides (#3730)
We get and retain Compressor instances in HFileBlockDefaultEncodingContext, and could in theory call Compressor#reinit when setting up the context, to update compression parameters like level and buffer size, but we do not plumb through the CompoundConfiguration from the Store into the encoding context. As a consequence we can only update codec parameters globally in system site conf files. Fine grained configurability is important for algorithms like ZStandard (ZSTD), which offers more than 20 compression levels, where at level 1 it is almost as fast as LZ4, and where at higher levels it utilizes computationally expensive techniques to rival LZMA at compression ratio but trades off significantly for reduced compresson throughput. The ZSTD level that should be set for a given column family or table will vary by use case. Signed-off-by: Viraj Jasani <vjasani@apache.org> Conflicts: hbase-compression/hbase-compression-zstd/src/main/java/org/apache/hadoop/hbase/io/compress/zstd/ZstdDecompressor.java hbase-server/src/test/java/org/apache/hadoop/hbase/io/compress/HFileTestBase.java hbase-server/src/test/java/org/apache/hadoop/hbase/io/encoding/TestDataBlockEncoders.java hbase-server/src/test/java/org/apache/hadoop/hbase/io/encoding/TestSeekToBlockWithEncoders.java
This commit is contained in:
parent
7e574c9f1d
commit
b6bb18022f
|
@ -0,0 +1,31 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with this
|
||||
* work for additional information regarding copyright ownership. The ASF
|
||||
* licenses this file to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
* License for the specific language governing permissions and limitations under
|
||||
* the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.io.compress;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
|
||||
/**
|
||||
* This is a marker interface that indicates if a compressor or decompressor
|
||||
* type can support reinitialization via reinit(Configuration conf).
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public interface CanReinit {
|
||||
|
||||
void reinit(Configuration conf);
|
||||
|
||||
}
|
|
@ -22,12 +22,9 @@ import java.io.FilterOutputStream;
|
|||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.OutputStream;
|
||||
|
||||
import org.apache.hadoop.conf.Configurable;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||
import org.apache.hadoop.hbase.io.util.BlockIOUtils;
|
||||
import org.apache.hadoop.hbase.nio.ByteBuff;
|
||||
import org.apache.hadoop.io.compress.CodecPool;
|
||||
import org.apache.hadoop.io.compress.CompressionCodec;
|
||||
import org.apache.hadoop.io.compress.CompressionInputStream;
|
||||
|
@ -521,37 +518,6 @@ public final class Compression {
|
|||
return ret;
|
||||
}
|
||||
|
||||
/**
|
||||
* Decompresses data from the given stream using the configured compression algorithm. It will
|
||||
* throw an exception if the dest buffer does not have enough space to hold the decompressed data.
|
||||
* @param dest the output buffer
|
||||
* @param bufferedBoundedStream a stream to read compressed data from, bounded to the exact amount
|
||||
* of compressed data
|
||||
* @param uncompressedSize uncompressed data size, header not included
|
||||
* @param compressAlgo compression algorithm used
|
||||
* @throws IOException if any IO error happen
|
||||
*/
|
||||
public static void decompress(ByteBuff dest, InputStream bufferedBoundedStream,
|
||||
int uncompressedSize, Compression.Algorithm compressAlgo) throws IOException {
|
||||
if (dest.remaining() < uncompressedSize) {
|
||||
throw new IllegalArgumentException("Output buffer does not have enough space to hold "
|
||||
+ uncompressedSize + " decompressed bytes, available: " + dest.remaining());
|
||||
}
|
||||
|
||||
Decompressor decompressor = null;
|
||||
try {
|
||||
decompressor = compressAlgo.getDecompressor();
|
||||
try (InputStream is =
|
||||
compressAlgo.createDecompressionStream(bufferedBoundedStream, decompressor, 0)) {
|
||||
BlockIOUtils.readFullyWithHeapBuffer(is, dest, uncompressedSize);
|
||||
}
|
||||
} finally {
|
||||
if (decompressor != null) {
|
||||
compressAlgo.returnDecompressor(decompressor);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Load a codec implementation for an algorithm using the supplied configuration.
|
||||
* @param conf the configuration to use
|
||||
|
|
|
@ -18,6 +18,8 @@ package org.apache.hadoop.hbase.io.encoding;
|
|||
|
||||
import java.io.IOException;
|
||||
import java.nio.ByteBuffer;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.ByteBufferKeyOnlyKeyValue;
|
||||
import org.apache.hadoop.hbase.Cell;
|
||||
import org.apache.hadoop.hbase.KeyValue;
|
||||
|
@ -29,14 +31,15 @@ import org.apache.yetus.audience.InterfaceAudience;
|
|||
public abstract class AbstractDataBlockEncoder implements DataBlockEncoder {
|
||||
|
||||
@Override
|
||||
public HFileBlockEncodingContext newDataBlockEncodingContext(
|
||||
public HFileBlockEncodingContext newDataBlockEncodingContext(Configuration conf,
|
||||
DataBlockEncoding encoding, byte[] header, HFileContext meta) {
|
||||
return new HFileBlockDefaultEncodingContext(encoding, header, meta);
|
||||
return new HFileBlockDefaultEncodingContext(conf, encoding, header, meta);
|
||||
}
|
||||
|
||||
@Override
|
||||
public HFileBlockDecodingContext newDataBlockDecodingContext(HFileContext meta) {
|
||||
return new HFileBlockDefaultDecodingContext(meta);
|
||||
public HFileBlockDecodingContext newDataBlockDecodingContext(Configuration conf,
|
||||
HFileContext meta) {
|
||||
return new HFileBlockDefaultDecodingContext(conf, meta);
|
||||
}
|
||||
|
||||
protected void postEncoding(HFileBlockEncodingContext encodingCtx)
|
||||
|
|
|
@ -20,6 +20,8 @@ import java.io.DataInputStream;
|
|||
import java.io.DataOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.nio.ByteBuffer;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.Cell;
|
||||
import org.apache.hadoop.hbase.CellComparator;
|
||||
import org.apache.hadoop.hbase.io.hfile.HFileContext;
|
||||
|
@ -91,6 +93,8 @@ public interface DataBlockEncoder {
|
|||
/**
|
||||
* Creates a encoder specific encoding context
|
||||
*
|
||||
* @param conf
|
||||
* store configuration
|
||||
* @param encoding
|
||||
* encoding strategy used
|
||||
* @param headerBytes
|
||||
|
@ -100,18 +104,20 @@ public interface DataBlockEncoder {
|
|||
* HFile meta data
|
||||
* @return a newly created encoding context
|
||||
*/
|
||||
HFileBlockEncodingContext newDataBlockEncodingContext(
|
||||
HFileBlockEncodingContext newDataBlockEncodingContext(Configuration conf,
|
||||
DataBlockEncoding encoding, byte[] headerBytes, HFileContext meta);
|
||||
|
||||
/**
|
||||
* Creates an encoder specific decoding context, which will prepare the data
|
||||
* before actual decoding
|
||||
*
|
||||
* @param conf
|
||||
* store configuration
|
||||
* @param meta
|
||||
* HFile meta data
|
||||
* @return a newly created decoding context
|
||||
*/
|
||||
HFileBlockDecodingContext newDataBlockDecodingContext(HFileContext meta);
|
||||
HFileBlockDecodingContext newDataBlockDecodingContext(Configuration conf, HFileContext meta);
|
||||
|
||||
/**
|
||||
* An interface which enable to seek while underlying data is encoded.
|
||||
|
|
|
@ -27,6 +27,7 @@ import java.util.ArrayList;
|
|||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import org.apache.commons.lang3.NotImplementedException;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.Cell;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.KeyValue;
|
||||
|
@ -57,6 +58,7 @@ public class EncodedDataBlock {
|
|||
private HFileContext meta;
|
||||
|
||||
private final DataBlockEncoding encoding;
|
||||
private final Configuration conf;
|
||||
|
||||
// The is for one situation that there are some cells includes tags and others are not.
|
||||
// isTagsLenZero stores if cell tags length is zero before doing encoding since we need
|
||||
|
@ -68,21 +70,23 @@ public class EncodedDataBlock {
|
|||
|
||||
/**
|
||||
* Create a buffer which will be encoded using dataBlockEncoder.
|
||||
* @param conf store configuration
|
||||
* @param dataBlockEncoder Algorithm used for compression.
|
||||
* @param encoding encoding type used
|
||||
* @param rawKVs
|
||||
* @param meta
|
||||
* @param rawKVs raw KVs
|
||||
* @param meta hfile context
|
||||
*/
|
||||
public EncodedDataBlock(DataBlockEncoder dataBlockEncoder, DataBlockEncoding encoding,
|
||||
byte[] rawKVs, HFileContext meta) {
|
||||
public EncodedDataBlock(Configuration conf, DataBlockEncoder dataBlockEncoder,
|
||||
DataBlockEncoding encoding, byte[] rawKVs, HFileContext meta) {
|
||||
Preconditions.checkNotNull(encoding,
|
||||
"Cannot create encoded data block with null encoder");
|
||||
this.dataBlockEncoder = dataBlockEncoder;
|
||||
this.encoding = encoding;
|
||||
encodingCtx = dataBlockEncoder.newDataBlockEncodingContext(encoding,
|
||||
encodingCtx = dataBlockEncoder.newDataBlockEncodingContext(conf, encoding,
|
||||
HConstants.HFILEBLOCK_DUMMY_HEADER, meta);
|
||||
this.rawKVs = rawKVs;
|
||||
this.meta = meta;
|
||||
this.conf = conf;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -115,7 +119,7 @@ public class EncodedDataBlock {
|
|||
if (decompressedData == null) {
|
||||
try {
|
||||
decompressedData = dataBlockEncoder.decodeKeyValues(dis, dataBlockEncoder
|
||||
.newDataBlockDecodingContext(meta));
|
||||
.newDataBlockDecodingContext(conf, meta));
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException("Problem with data block encoder, " +
|
||||
"most likely it requested more bytes than are available.", e);
|
||||
|
|
|
@ -20,8 +20,10 @@ import java.io.DataInputStream;
|
|||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.io.ByteBuffInputStream;
|
||||
import org.apache.hadoop.hbase.io.TagCompressionContext;
|
||||
import org.apache.hadoop.hbase.io.compress.CanReinit;
|
||||
import org.apache.hadoop.hbase.io.compress.Compression;
|
||||
import org.apache.hadoop.hbase.io.crypto.Cipher;
|
||||
import org.apache.hadoop.hbase.io.crypto.Decryptor;
|
||||
|
@ -30,6 +32,7 @@ import org.apache.hadoop.hbase.io.hfile.HFileContext;
|
|||
import org.apache.hadoop.hbase.io.util.BlockIOUtils;
|
||||
import org.apache.hadoop.hbase.nio.ByteBuff;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.io.compress.Decompressor;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
|
||||
/**
|
||||
|
@ -41,10 +44,12 @@ import org.apache.yetus.audience.InterfaceAudience;
|
|||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class HFileBlockDefaultDecodingContext implements HFileBlockDecodingContext {
|
||||
private final Configuration conf;
|
||||
private final HFileContext fileContext;
|
||||
private TagCompressionContext tagCompressionContext;
|
||||
|
||||
public HFileBlockDefaultDecodingContext(HFileContext fileContext) {
|
||||
public HFileBlockDefaultDecodingContext(Configuration conf, HFileContext fileContext) {
|
||||
this.conf = conf;
|
||||
this.fileContext = fileContext;
|
||||
}
|
||||
|
||||
|
@ -87,8 +92,24 @@ public class HFileBlockDefaultDecodingContext implements HFileBlockDecodingConte
|
|||
|
||||
Compression.Algorithm compression = fileContext.getCompression();
|
||||
if (compression != Compression.Algorithm.NONE) {
|
||||
Compression.decompress(blockBufferWithoutHeader, dataInputStream,
|
||||
uncompressedSizeWithoutHeader, compression);
|
||||
Decompressor decompressor = null;
|
||||
try {
|
||||
decompressor = compression.getDecompressor();
|
||||
// Some algorithms don't return decompressors and accept null as a valid parameter for
|
||||
// same when creating decompression streams. We can ignore these cases wrt reinit.
|
||||
if (decompressor instanceof CanReinit) {
|
||||
((CanReinit)decompressor).reinit(conf);
|
||||
}
|
||||
try (InputStream is =
|
||||
compression.createDecompressionStream(dataInputStream, decompressor, 0)) {
|
||||
BlockIOUtils.readFullyWithHeapBuffer(is, blockBufferWithoutHeader,
|
||||
uncompressedSizeWithoutHeader);
|
||||
}
|
||||
} finally {
|
||||
if (decompressor != null) {
|
||||
compression.returnDecompressor(decompressor);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
BlockIOUtils.readFullyWithHeapBuffer(dataInputStream, blockBufferWithoutHeader,
|
||||
onDiskSizeWithoutHeader);
|
||||
|
|
|
@ -22,6 +22,8 @@ import java.io.DataOutputStream;
|
|||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.security.SecureRandom;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.io.ByteArrayOutputStream;
|
||||
import org.apache.hadoop.hbase.io.TagCompressionContext;
|
||||
import org.apache.hadoop.hbase.io.compress.Compression;
|
||||
|
@ -72,18 +74,26 @@ public class HFileBlockDefaultEncodingContext implements HFileBlockEncodingConte
|
|||
private EncodingState encoderState;
|
||||
|
||||
/**
|
||||
* @param conf configuraton
|
||||
* @param encoding encoding used
|
||||
* @param headerBytes dummy header bytes
|
||||
* @param fileContext HFile meta data
|
||||
*/
|
||||
public HFileBlockDefaultEncodingContext(DataBlockEncoding encoding, byte[] headerBytes,
|
||||
HFileContext fileContext) {
|
||||
public HFileBlockDefaultEncodingContext(Configuration conf, DataBlockEncoding encoding,
|
||||
byte[] headerBytes, HFileContext fileContext) {
|
||||
this.encodingAlgo = encoding;
|
||||
this.fileContext = fileContext;
|
||||
Compression.Algorithm compressionAlgorithm =
|
||||
fileContext.getCompression() == null ? NONE : fileContext.getCompression();
|
||||
if (compressionAlgorithm != NONE) {
|
||||
if (compressor == null) {
|
||||
compressor = compressionAlgorithm.getCompressor();
|
||||
// Some algorithms don't return compressors and accept null as a valid parameter for
|
||||
// same when creating compression streams. We can ignore these cases wrt reinit.
|
||||
if (compressor != null) {
|
||||
compressor.reinit(conf);
|
||||
}
|
||||
}
|
||||
compressedByteStream = new ByteArrayOutputStream();
|
||||
try {
|
||||
compressionStream =
|
||||
|
|
|
@ -20,6 +20,7 @@ import java.io.IOException;
|
|||
import java.nio.ByteBuffer;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.io.compress.CanReinit;
|
||||
import org.apache.hadoop.hbase.io.compress.CompressionUtil;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.slf4j.Logger;
|
||||
|
@ -32,7 +33,7 @@ import io.airlift.compress.Compressor;
|
|||
*/
|
||||
@InterfaceAudience.Private
|
||||
public abstract class HadoopCompressor<T extends Compressor>
|
||||
implements org.apache.hadoop.io.compress.Compressor {
|
||||
implements CanReinit, org.apache.hadoop.io.compress.Compressor {
|
||||
|
||||
protected static final Logger LOG = LoggerFactory.getLogger(HadoopCompressor.class);
|
||||
protected T compressor;
|
||||
|
|
|
@ -18,7 +18,9 @@
|
|||
package org.apache.hadoop.hbase.io.compress.aircompressor;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.io.compress.Compression;
|
||||
import org.apache.hadoop.hbase.io.compress.HFileTestBase;
|
||||
import org.apache.hadoop.hbase.testclassification.IOTests;
|
||||
|
@ -35,9 +37,11 @@ public class TestHFileCompressionLz4 extends HFileTestBase {
|
|||
public static final HBaseClassTestRule CLASS_RULE =
|
||||
HBaseClassTestRule.forClass(TestHFileCompressionLz4.class);
|
||||
|
||||
private static Configuration conf;
|
||||
|
||||
@BeforeClass
|
||||
public static void setUpBeforeClass() throws Exception {
|
||||
Configuration conf = TEST_UTIL.getConfiguration();
|
||||
conf = TEST_UTIL.getConfiguration();
|
||||
conf.set(Compression.LZ4_CODEC_CLASS_KEY, Lz4Codec.class.getCanonicalName());
|
||||
Compression.Algorithm.LZ4.reload(conf);
|
||||
HFileTestBase.setUpBeforeClass();
|
||||
|
@ -45,7 +49,9 @@ public class TestHFileCompressionLz4 extends HFileTestBase {
|
|||
|
||||
@Test
|
||||
public void test() throws Exception {
|
||||
doTest(Compression.Algorithm.LZ4);
|
||||
Path path = new Path(TEST_UTIL.getDataTestDir(),
|
||||
HBaseTestingUtility.getRandomUUID().toString() + ".hfile");
|
||||
doTest(conf, path, Compression.Algorithm.LZ4);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -18,7 +18,9 @@
|
|||
package org.apache.hadoop.hbase.io.compress.aircompressor;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.io.compress.Compression;
|
||||
import org.apache.hadoop.hbase.io.compress.HFileTestBase;
|
||||
import org.apache.hadoop.hbase.testclassification.IOTests;
|
||||
|
@ -35,9 +37,11 @@ public class TestHFileCompressionLzo extends HFileTestBase {
|
|||
public static final HBaseClassTestRule CLASS_RULE =
|
||||
HBaseClassTestRule.forClass(TestHFileCompressionLzo.class);
|
||||
|
||||
private static Configuration conf;
|
||||
|
||||
@BeforeClass
|
||||
public static void setUpBeforeClass() throws Exception {
|
||||
Configuration conf = TEST_UTIL.getConfiguration();
|
||||
conf = TEST_UTIL.getConfiguration();
|
||||
conf.set(Compression.LZO_CODEC_CLASS_KEY, LzoCodec.class.getCanonicalName());
|
||||
Compression.Algorithm.LZO.reload(conf);
|
||||
HFileTestBase.setUpBeforeClass();
|
||||
|
@ -45,7 +49,9 @@ public class TestHFileCompressionLzo extends HFileTestBase {
|
|||
|
||||
@Test
|
||||
public void test() throws Exception {
|
||||
doTest(Compression.Algorithm.LZO);
|
||||
Path path = new Path(TEST_UTIL.getDataTestDir(),
|
||||
HBaseTestingUtility.getRandomUUID().toString() + ".hfile");
|
||||
doTest(conf, path, Compression.Algorithm.LZO);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -18,7 +18,9 @@
|
|||
package org.apache.hadoop.hbase.io.compress.aircompressor;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.io.compress.Compression;
|
||||
import org.apache.hadoop.hbase.io.compress.HFileTestBase;
|
||||
import org.apache.hadoop.hbase.testclassification.IOTests;
|
||||
|
@ -35,9 +37,11 @@ public class TestHFileCompressionSnappy extends HFileTestBase {
|
|||
public static final HBaseClassTestRule CLASS_RULE =
|
||||
HBaseClassTestRule.forClass(TestHFileCompressionSnappy.class);
|
||||
|
||||
private static Configuration conf;
|
||||
|
||||
@BeforeClass
|
||||
public static void setUpBeforeClass() throws Exception {
|
||||
Configuration conf = TEST_UTIL.getConfiguration();
|
||||
conf = TEST_UTIL.getConfiguration();
|
||||
conf.set(Compression.SNAPPY_CODEC_CLASS_KEY, SnappyCodec.class.getCanonicalName());
|
||||
Compression.Algorithm.SNAPPY.reload(conf);
|
||||
HFileTestBase.setUpBeforeClass();
|
||||
|
@ -45,7 +49,9 @@ public class TestHFileCompressionSnappy extends HFileTestBase {
|
|||
|
||||
@Test
|
||||
public void test() throws Exception {
|
||||
doTest(Compression.Algorithm.SNAPPY);
|
||||
Path path = new Path(TEST_UTIL.getDataTestDir(),
|
||||
HBaseTestingUtility.getRandomUUID().toString() + ".hfile");
|
||||
doTest(conf, path, Compression.Algorithm.SNAPPY);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -18,7 +18,9 @@
|
|||
package org.apache.hadoop.hbase.io.compress.aircompressor;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.io.compress.Compression;
|
||||
import org.apache.hadoop.hbase.io.compress.HFileTestBase;
|
||||
import org.apache.hadoop.hbase.testclassification.IOTests;
|
||||
|
@ -35,9 +37,11 @@ public class TestHFileCompressionZstd extends HFileTestBase {
|
|||
public static final HBaseClassTestRule CLASS_RULE =
|
||||
HBaseClassTestRule.forClass(TestHFileCompressionZstd.class);
|
||||
|
||||
private static Configuration conf;
|
||||
|
||||
@BeforeClass
|
||||
public static void setUpBeforeClass() throws Exception {
|
||||
Configuration conf = TEST_UTIL.getConfiguration();
|
||||
conf = TEST_UTIL.getConfiguration();
|
||||
conf.set(Compression.ZSTD_CODEC_CLASS_KEY, ZstdCodec.class.getCanonicalName());
|
||||
Compression.Algorithm.ZSTD.reload(conf);
|
||||
HFileTestBase.setUpBeforeClass();
|
||||
|
@ -45,7 +49,9 @@ public class TestHFileCompressionZstd extends HFileTestBase {
|
|||
|
||||
@Test
|
||||
public void test() throws Exception {
|
||||
doTest(Compression.Algorithm.ZSTD);
|
||||
Path path = new Path(TEST_UTIL.getDataTestDir(),
|
||||
HBaseTestingUtility.getRandomUUID().toString() + ".hfile");
|
||||
doTest(conf, path, Compression.Algorithm.ZSTD);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -20,6 +20,7 @@ import java.io.IOException;
|
|||
import java.nio.ByteBuffer;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.io.compress.CanReinit;
|
||||
import org.apache.hadoop.hbase.io.compress.CompressionUtil;
|
||||
import org.apache.hadoop.io.compress.Compressor;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
|
@ -33,7 +34,7 @@ import net.jpountz.lz4.LZ4Factory;
|
|||
* Hadoop compressor glue for lz4-java.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class Lz4Compressor implements Compressor {
|
||||
public class Lz4Compressor implements CanReinit, Compressor {
|
||||
|
||||
protected static final Logger LOG = LoggerFactory.getLogger(Lz4Compressor.class);
|
||||
protected LZ4Compressor compressor;
|
||||
|
|
|
@ -18,7 +18,9 @@
|
|||
package org.apache.hadoop.hbase.io.compress.lz4;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.io.compress.Compression;
|
||||
import org.apache.hadoop.hbase.io.compress.HFileTestBase;
|
||||
import org.apache.hadoop.hbase.testclassification.IOTests;
|
||||
|
@ -35,9 +37,11 @@ public class TestHFileCompressionLz4 extends HFileTestBase {
|
|||
public static final HBaseClassTestRule CLASS_RULE =
|
||||
HBaseClassTestRule.forClass(TestHFileCompressionLz4.class);
|
||||
|
||||
private static Configuration conf;
|
||||
|
||||
@BeforeClass
|
||||
public static void setUpBeforeClass() throws Exception {
|
||||
Configuration conf = TEST_UTIL.getConfiguration();
|
||||
conf = TEST_UTIL.getConfiguration();
|
||||
conf.set(Compression.LZ4_CODEC_CLASS_KEY, Lz4Codec.class.getCanonicalName());
|
||||
Compression.Algorithm.LZ4.reload(conf);
|
||||
HFileTestBase.setUpBeforeClass();
|
||||
|
@ -45,7 +49,9 @@ public class TestHFileCompressionLz4 extends HFileTestBase {
|
|||
|
||||
@Test
|
||||
public void test() throws Exception {
|
||||
doTest(Compression.Algorithm.LZ4);
|
||||
Path path = new Path(TEST_UTIL.getDataTestDir(),
|
||||
HBaseTestingUtility.getRandomUUID().toString() + ".hfile");
|
||||
doTest(conf, path, Compression.Algorithm.LZ4);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -20,6 +20,7 @@ import java.io.IOException;
|
|||
import java.nio.ByteBuffer;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.io.compress.CanReinit;
|
||||
import org.apache.hadoop.hbase.io.compress.CompressionUtil;
|
||||
import org.apache.hadoop.io.compress.Compressor;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
|
@ -32,7 +33,7 @@ import org.xerial.snappy.Snappy;
|
|||
* Hadoop compressor glue for Xerial Snappy.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class SnappyCompressor implements Compressor {
|
||||
public class SnappyCompressor implements CanReinit, Compressor {
|
||||
|
||||
protected static final Logger LOG = LoggerFactory.getLogger(SnappyCompressor.class);
|
||||
protected ByteBuffer inBuf, outBuf;
|
||||
|
|
|
@ -18,7 +18,9 @@
|
|||
package org.apache.hadoop.hbase.io.compress.xerial;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.io.compress.Compression;
|
||||
import org.apache.hadoop.hbase.io.compress.HFileTestBase;
|
||||
import org.apache.hadoop.hbase.testclassification.IOTests;
|
||||
|
@ -35,9 +37,11 @@ public class TestHFileCompressionSnappy extends HFileTestBase {
|
|||
public static final HBaseClassTestRule CLASS_RULE =
|
||||
HBaseClassTestRule.forClass(TestHFileCompressionSnappy.class);
|
||||
|
||||
private static Configuration conf;
|
||||
|
||||
@BeforeClass
|
||||
public static void setUpBeforeClass() throws Exception {
|
||||
Configuration conf = TEST_UTIL.getConfiguration();
|
||||
conf = TEST_UTIL.getConfiguration();
|
||||
conf.set(Compression.SNAPPY_CODEC_CLASS_KEY, SnappyCodec.class.getCanonicalName());
|
||||
Compression.Algorithm.SNAPPY.reload(conf);
|
||||
HFileTestBase.setUpBeforeClass();
|
||||
|
@ -45,7 +49,9 @@ public class TestHFileCompressionSnappy extends HFileTestBase {
|
|||
|
||||
@Test
|
||||
public void test() throws Exception {
|
||||
doTest(Compression.Algorithm.SNAPPY);
|
||||
Path path = new Path(TEST_UTIL.getDataTestDir(),
|
||||
HBaseTestingUtility.getRandomUUID().toString() + ".hfile");
|
||||
doTest(conf, path, Compression.Algorithm.SNAPPY);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -17,8 +17,12 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.io.compress.xz;
|
||||
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.io.compress.Compression;
|
||||
import org.apache.hadoop.hbase.io.compress.HFileTestBase;
|
||||
import org.apache.hadoop.hbase.testclassification.IOTests;
|
||||
|
@ -35,9 +39,11 @@ public class TestHFileCompressionLzma extends HFileTestBase {
|
|||
public static final HBaseClassTestRule CLASS_RULE =
|
||||
HBaseClassTestRule.forClass(TestHFileCompressionLzma.class);
|
||||
|
||||
private static Configuration conf;
|
||||
|
||||
@BeforeClass
|
||||
public static void setUpBeforeClass() throws Exception {
|
||||
Configuration conf = TEST_UTIL.getConfiguration();
|
||||
conf = TEST_UTIL.getConfiguration();
|
||||
conf.set(Compression.LZMA_CODEC_CLASS_KEY, LzmaCodec.class.getCanonicalName());
|
||||
Compression.Algorithm.LZMA.reload(conf);
|
||||
HFileTestBase.setUpBeforeClass();
|
||||
|
@ -45,7 +51,26 @@ public class TestHFileCompressionLzma extends HFileTestBase {
|
|||
|
||||
@Test
|
||||
public void test() throws Exception {
|
||||
doTest(Compression.Algorithm.LZMA);
|
||||
Path path = new Path(TEST_UTIL.getDataTestDir(),
|
||||
HBaseTestingUtility.getRandomUUID().toString() + ".hfile");
|
||||
doTest(conf, path, Compression.Algorithm.LZMA);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testReconfLevels() throws Exception {
|
||||
Path path_1 = new Path(TEST_UTIL.getDataTestDir(),
|
||||
HBaseTestingUtility.getRandomUUID().toString() + ".1.hfile");
|
||||
Path path_2 = new Path(TEST_UTIL.getDataTestDir(),
|
||||
HBaseTestingUtility.getRandomUUID().toString() + ".2.hfile");
|
||||
conf.setInt(LzmaCodec.LZMA_LEVEL_KEY, 1);
|
||||
doTest(conf, path_1, Compression.Algorithm.LZMA);
|
||||
long len_1 = FS.getFileStatus(path_1).getLen();
|
||||
conf.setInt(LzmaCodec.LZMA_LEVEL_KEY, 9);
|
||||
doTest(conf, path_2, Compression.Algorithm.LZMA);
|
||||
long len_2 = FS.getFileStatus(path_2).getLen();
|
||||
LOG.info("Level 1 len {}", len_1);
|
||||
LOG.info("Level 9 len {}", len_2);
|
||||
assertTrue("Reconfiguraton with LZMA_LEVEL_KEY did not seem to work", len_1 > len_2);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -20,6 +20,7 @@ import java.io.IOException;
|
|||
import java.nio.ByteBuffer;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.io.compress.CanReinit;
|
||||
import org.apache.hadoop.hbase.io.compress.CompressionUtil;
|
||||
import org.apache.hadoop.io.compress.Compressor;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
|
@ -32,7 +33,7 @@ import com.github.luben.zstd.Zstd;
|
|||
* Hadoop compressor glue for zstd-jni.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class ZstdCompressor implements Compressor {
|
||||
public class ZstdCompressor implements CanReinit, Compressor {
|
||||
|
||||
protected static final Logger LOG = LoggerFactory.getLogger(ZstdCompressor.class);
|
||||
protected int level, bufferSize;
|
||||
|
@ -40,7 +41,7 @@ public class ZstdCompressor implements Compressor {
|
|||
protected boolean finish, finished;
|
||||
protected long bytesRead, bytesWritten;
|
||||
|
||||
ZstdCompressor(int level, int bufferSize) {
|
||||
ZstdCompressor(final int level, final int bufferSize) {
|
||||
this.level = level;
|
||||
this.bufferSize = bufferSize;
|
||||
this.inBuf = ByteBuffer.allocateDirect(bufferSize);
|
||||
|
@ -49,7 +50,7 @@ public class ZstdCompressor implements Compressor {
|
|||
}
|
||||
|
||||
@Override
|
||||
public int compress(byte[] b, int off, int len) throws IOException {
|
||||
public int compress(final byte[] b, final int off, final int len) throws IOException {
|
||||
// If we have previously compressed our input and still have some buffered bytes
|
||||
// remaining, provide them to the caller.
|
||||
if (outBuf.hasRemaining()) {
|
||||
|
@ -73,7 +74,7 @@ public class ZstdCompressor implements Compressor {
|
|||
} else {
|
||||
outBuf.clear();
|
||||
}
|
||||
int written = Zstd.compress(outBuf, inBuf, level, true);
|
||||
int written = Zstd.compress(outBuf, inBuf, level);
|
||||
bytesWritten += written;
|
||||
inBuf.clear();
|
||||
LOG.trace("compress: compressed {} -> {} (level {})", uncompressed, written, level);
|
||||
|
@ -127,7 +128,7 @@ public class ZstdCompressor implements Compressor {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void reinit(Configuration conf) {
|
||||
public void reinit(final Configuration conf) {
|
||||
LOG.trace("reinit");
|
||||
if (conf != null) {
|
||||
// Level might have changed
|
||||
|
@ -156,12 +157,12 @@ public class ZstdCompressor implements Compressor {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void setDictionary(byte[] b, int off, int len) {
|
||||
public void setDictionary(final byte[] b, final int off, final int len) {
|
||||
throw new UnsupportedOperationException("setDictionary is not supported");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setInput(byte[] b, int off, int len) {
|
||||
public void setInput(final byte[] b, final int off, final int len) {
|
||||
LOG.trace("setInput: off={} len={}", off, len);
|
||||
if (inBuf.remaining() < len) {
|
||||
// Get a new buffer that can accomodate the accumulated input plus the additional
|
||||
|
@ -181,7 +182,7 @@ public class ZstdCompressor implements Compressor {
|
|||
|
||||
// Package private
|
||||
|
||||
int maxCompressedLength(int len) {
|
||||
int maxCompressedLength(final int len) {
|
||||
return (int) Zstd.compressBound(len);
|
||||
}
|
||||
|
||||
|
|
|
@ -18,34 +18,36 @@ package org.apache.hadoop.hbase.io.compress.zstd;
|
|||
|
||||
import java.io.IOException;
|
||||
import java.nio.ByteBuffer;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.io.compress.CanReinit;
|
||||
import org.apache.hadoop.hbase.io.compress.CompressionUtil;
|
||||
import org.apache.hadoop.io.compress.Decompressor;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import com.github.luben.zstd.Zstd;
|
||||
|
||||
/**
|
||||
* Hadoop decompressor glue for zstd-java.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class ZstdDecompressor implements Decompressor {
|
||||
public class ZstdDecompressor implements CanReinit, Decompressor {
|
||||
|
||||
protected static final Logger LOG = LoggerFactory.getLogger(ZstdDecompressor.class);
|
||||
protected ByteBuffer inBuf, outBuf;
|
||||
protected int bufferSize;
|
||||
protected int inLen;
|
||||
protected boolean finished;
|
||||
|
||||
ZstdDecompressor(int bufferSize) {
|
||||
ZstdDecompressor(final int bufferSize) {
|
||||
this.bufferSize = bufferSize;
|
||||
this.inBuf = ByteBuffer.allocateDirect(bufferSize);
|
||||
this.outBuf = ByteBuffer.allocateDirect(bufferSize);
|
||||
this.outBuf.position(bufferSize);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int decompress(byte[] b, int off, int len) throws IOException {
|
||||
public int decompress(final byte[] b, final int off, final int len) throws IOException {
|
||||
if (outBuf.hasRemaining()) {
|
||||
int remaining = outBuf.remaining(), n = Math.min(remaining, len);
|
||||
outBuf.get(b, off, n);
|
||||
|
@ -57,7 +59,8 @@ public class ZstdDecompressor implements Decompressor {
|
|||
int remaining = inBuf.remaining();
|
||||
inLen -= remaining;
|
||||
outBuf.clear();
|
||||
int written = Zstd.decompress(outBuf, inBuf);
|
||||
int written;
|
||||
written = Zstd.decompress(outBuf, inBuf);
|
||||
inBuf.clear();
|
||||
LOG.trace("decompress: decompressed {} -> {}", remaining, written);
|
||||
outBuf.flip();
|
||||
|
@ -106,24 +109,25 @@ public class ZstdDecompressor implements Decompressor {
|
|||
|
||||
@Override
|
||||
public boolean needsInput() {
|
||||
boolean b = (inBuf.position() == 0);
|
||||
final boolean b = (inBuf.position() == 0);
|
||||
LOG.trace("needsInput: {}", b);
|
||||
return b;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setDictionary(byte[] b, int off, int len) {
|
||||
throw new UnsupportedOperationException("setDictionary is not supported");
|
||||
public void setDictionary(final byte[] b, final int off, final int len) {
|
||||
LOG.trace("setDictionary: off={} len={}", off, len);
|
||||
throw new UnsupportedOperationException("setDictionary not supported");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setInput(byte[] b, int off, int len) {
|
||||
public void setInput(final byte[] b, final int off, final int len) {
|
||||
LOG.trace("setInput: off={} len={}", off, len);
|
||||
if (inBuf.remaining() < len) {
|
||||
// Get a new buffer that can accomodate the accumulated input plus the additional
|
||||
// input that would cause a buffer overflow without reallocation.
|
||||
// This condition should be fortunately rare, because it is expensive.
|
||||
int needed = CompressionUtil.roundInt2(inBuf.capacity() + len);
|
||||
final int needed = CompressionUtil.roundInt2(inBuf.capacity() + len);
|
||||
LOG.trace("setInput: resize inBuf {}", needed);
|
||||
ByteBuffer newBuf = ByteBuffer.allocateDirect(needed);
|
||||
inBuf.flip();
|
||||
|
@ -135,4 +139,19 @@ public class ZstdDecompressor implements Decompressor {
|
|||
finished = false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void reinit(final Configuration conf) {
|
||||
LOG.trace("reinit");
|
||||
if (conf != null) {
|
||||
// Buffer size might have changed
|
||||
int newBufferSize = ZstdCodec.getBufferSize(conf);
|
||||
if (bufferSize != newBufferSize) {
|
||||
bufferSize = newBufferSize;
|
||||
this.inBuf = ByteBuffer.allocateDirect(bufferSize);
|
||||
this.outBuf = ByteBuffer.allocateDirect(bufferSize);
|
||||
}
|
||||
}
|
||||
reset();
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -17,8 +17,12 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.io.compress.zstd;
|
||||
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.io.compress.Compression;
|
||||
import org.apache.hadoop.hbase.io.compress.HFileTestBase;
|
||||
import org.apache.hadoop.hbase.testclassification.IOTests;
|
||||
|
@ -35,9 +39,11 @@ public class TestHFileCompressionZstd extends HFileTestBase {
|
|||
public static final HBaseClassTestRule CLASS_RULE =
|
||||
HBaseClassTestRule.forClass(TestHFileCompressionZstd.class);
|
||||
|
||||
private static Configuration conf;
|
||||
|
||||
@BeforeClass
|
||||
public static void setUpBeforeClass() throws Exception {
|
||||
Configuration conf = TEST_UTIL.getConfiguration();
|
||||
conf = TEST_UTIL.getConfiguration();
|
||||
conf.set(Compression.ZSTD_CODEC_CLASS_KEY, ZstdCodec.class.getCanonicalName());
|
||||
Compression.Algorithm.ZSTD.reload(conf);
|
||||
HFileTestBase.setUpBeforeClass();
|
||||
|
@ -45,7 +51,26 @@ public class TestHFileCompressionZstd extends HFileTestBase {
|
|||
|
||||
@Test
|
||||
public void test() throws Exception {
|
||||
doTest(Compression.Algorithm.ZSTD);
|
||||
Path path = new Path(TEST_UTIL.getDataTestDir(),
|
||||
HBaseTestingUtility.getRandomUUID().toString() + ".hfile");
|
||||
doTest(conf, path, Compression.Algorithm.ZSTD);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testReconfLevels() throws Exception {
|
||||
Path path_1 = new Path(TEST_UTIL.getDataTestDir(),
|
||||
HBaseTestingUtility.getRandomUUID().toString() + ".1.hfile");
|
||||
Path path_2 = new Path(TEST_UTIL.getDataTestDir(),
|
||||
HBaseTestingUtility.getRandomUUID().toString() + ".2.hfile");
|
||||
conf.setInt(ZstdCodec.ZSTD_LEVEL_KEY, 1);
|
||||
doTest(conf, path_1, Compression.Algorithm.ZSTD);
|
||||
long len_1 = FS.getFileStatus(path_1).getLen();
|
||||
conf.setInt(ZstdCodec.ZSTD_LEVEL_KEY, 22);
|
||||
doTest(conf, path_2, Compression.Algorithm.ZSTD);
|
||||
long len_2 = FS.getFileStatus(path_2).getLen();
|
||||
LOG.info("Level 1 len {}", len_1);
|
||||
LOG.info("Level 22 len {}", len_2);
|
||||
assertTrue("Reconfiguraton with ZSTD_LEVEL_KEY did not seem to work", len_1 > len_2);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -91,7 +91,7 @@ public class HFileInputFormat extends FileInputFormat<NullWritable, Cell> {
|
|||
|
||||
// The file info must be loaded before the scanner can be used.
|
||||
// This seems like a bug in HBase, but it's easily worked around.
|
||||
this.scanner = in.getScanner(false, false);
|
||||
this.scanner = in.getScanner(conf, false, false);
|
||||
|
||||
}
|
||||
|
||||
|
|
|
@ -490,7 +490,7 @@ public class TestCellBasedHFileOutputFormat2 {
|
|||
LocatedFileStatus keyFileStatus = iterator.next();
|
||||
HFile.Reader reader =
|
||||
HFile.createReader(fs, keyFileStatus.getPath(), new CacheConfig(conf), true, conf);
|
||||
HFileScanner scanner = reader.getScanner(false, false, false);
|
||||
HFileScanner scanner = reader.getScanner(conf, false, false, false);
|
||||
scanner.seekTo();
|
||||
Cell cell = scanner.getCell();
|
||||
List<Tag> tagsFromCell = TagUtil.asList(cell.getTagsArray(), cell.getTagsOffset(),
|
||||
|
|
|
@ -476,7 +476,7 @@ public class TestHFileOutputFormat2 {
|
|||
LocatedFileStatus keyFileStatus = iterator.next();
|
||||
HFile.Reader reader =
|
||||
HFile.createReader(fs, keyFileStatus.getPath(), new CacheConfig(conf), true, conf);
|
||||
HFileScanner scanner = reader.getScanner(false, false, false);
|
||||
HFileScanner scanner = reader.getScanner(conf, false, false, false);
|
||||
|
||||
kvCount += reader.getEntries();
|
||||
scanner.seekTo();
|
||||
|
@ -525,7 +525,7 @@ public class TestHFileOutputFormat2 {
|
|||
LocatedFileStatus keyFileStatus = iterator.next();
|
||||
HFile.Reader reader =
|
||||
HFile.createReader(fs, keyFileStatus.getPath(), new CacheConfig(conf), true, conf);
|
||||
HFileScanner scanner = reader.getScanner(false, false, false);
|
||||
HFileScanner scanner = reader.getScanner(conf, false, false, false);
|
||||
scanner.seekTo();
|
||||
Cell cell = scanner.getCell();
|
||||
List<Tag> tagsFromCell = PrivateCellUtil.getTags(cell);
|
||||
|
|
|
@ -484,7 +484,7 @@ public class TestImportTSVWithVisibilityLabels implements Configurable {
|
|||
private static int getKVCountFromHfile(FileSystem fs, Path p) throws IOException {
|
||||
Configuration conf = util.getConfiguration();
|
||||
HFile.Reader reader = HFile.createReader(fs, p, new CacheConfig(conf), true, conf);
|
||||
HFileScanner scanner = reader.getScanner(false, false);
|
||||
HFileScanner scanner = reader.getScanner(conf, false, false);
|
||||
scanner.seekTo();
|
||||
int count = 0;
|
||||
do {
|
||||
|
|
|
@ -561,7 +561,7 @@ public class TestImportTsv implements Configurable {
|
|||
private static int getKVCountFromHfile(FileSystem fs, Path p) throws IOException {
|
||||
Configuration conf = util.getConfiguration();
|
||||
HFile.Reader reader = HFile.createReader(fs, p, new CacheConfig(conf), true, conf);
|
||||
HFileScanner scanner = reader.getScanner(false, false);
|
||||
HFileScanner scanner = reader.getScanner(conf, false, false);
|
||||
scanner.seekTo();
|
||||
int count = 0;
|
||||
do {
|
||||
|
|
|
@ -397,7 +397,8 @@ public final class HFile {
|
|||
|
||||
CellComparator getComparator();
|
||||
|
||||
HFileScanner getScanner(boolean cacheBlocks, final boolean pread, final boolean isCompaction);
|
||||
HFileScanner getScanner(Configuration conf, boolean cacheBlocks, boolean pread,
|
||||
boolean isCompaction);
|
||||
|
||||
HFileBlock getMetaBlock(String metaBlockName, boolean cacheBlock) throws IOException;
|
||||
|
||||
|
@ -425,7 +426,7 @@ public final class HFile {
|
|||
void setMetaBlockIndexReader(HFileBlockIndex.ByteArrayKeyBlockIndexReader reader);
|
||||
HFileBlockIndex.ByteArrayKeyBlockIndexReader getMetaBlockIndexReader();
|
||||
|
||||
HFileScanner getScanner(boolean cacheBlocks, boolean pread);
|
||||
HFileScanner getScanner(Configuration conf, boolean cacheBlocks, boolean pread);
|
||||
|
||||
/**
|
||||
* Retrieves general Bloom filter metadata as appropriate for each
|
||||
|
|
|
@ -28,6 +28,8 @@ import java.util.List;
|
|||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.concurrent.locks.Lock;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FSDataInputStream;
|
||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||
import org.apache.hadoop.hbase.Cell;
|
||||
|
@ -837,12 +839,13 @@ public class HFileBlock implements Cacheable {
|
|||
/**
|
||||
* @param dataBlockEncoder data block encoding algorithm to use
|
||||
*/
|
||||
public Writer(HFileDataBlockEncoder dataBlockEncoder, HFileContext fileContext) {
|
||||
this(dataBlockEncoder, fileContext, ByteBuffAllocator.HEAP);
|
||||
public Writer(Configuration conf, HFileDataBlockEncoder dataBlockEncoder,
|
||||
HFileContext fileContext) {
|
||||
this(conf, dataBlockEncoder, fileContext, ByteBuffAllocator.HEAP);
|
||||
}
|
||||
|
||||
public Writer(HFileDataBlockEncoder dataBlockEncoder, HFileContext fileContext,
|
||||
ByteBuffAllocator allocator) {
|
||||
public Writer(Configuration conf, HFileDataBlockEncoder dataBlockEncoder,
|
||||
HFileContext fileContext, ByteBuffAllocator allocator) {
|
||||
if (fileContext.getBytesPerChecksum() < HConstants.HFILEBLOCK_HEADER_SIZE) {
|
||||
throw new RuntimeException("Unsupported value of bytesPerChecksum. " +
|
||||
" Minimum is " + HConstants.HFILEBLOCK_HEADER_SIZE + " but the configured value is " +
|
||||
|
@ -851,10 +854,10 @@ public class HFileBlock implements Cacheable {
|
|||
this.allocator = allocator;
|
||||
this.dataBlockEncoder = dataBlockEncoder != null?
|
||||
dataBlockEncoder: NoOpDataBlockEncoder.INSTANCE;
|
||||
this.dataBlockEncodingCtx = this.dataBlockEncoder.
|
||||
newDataBlockEncodingContext(HConstants.HFILEBLOCK_DUMMY_HEADER, fileContext);
|
||||
// TODO: This should be lazily instantiated since we usually do NOT need this default encoder
|
||||
this.defaultBlockEncodingCtx = new HFileBlockDefaultEncodingContext(null,
|
||||
this.dataBlockEncodingCtx = this.dataBlockEncoder.newDataBlockEncodingContext(conf,
|
||||
HConstants.HFILEBLOCK_DUMMY_HEADER, fileContext);
|
||||
// TODO: This should be lazily instantiated
|
||||
this.defaultBlockEncodingCtx = new HFileBlockDefaultEncodingContext(conf, null,
|
||||
HConstants.HFILEBLOCK_DUMMY_HEADER, fileContext);
|
||||
// TODO: Set BAOS initial size. Use fileContext.getBlocksize() and add for header/checksum
|
||||
baosInMemory = new ByteArrayOutputStream();
|
||||
|
@ -1345,7 +1348,7 @@ public class HFileBlock implements Cacheable {
|
|||
HFileBlockDecodingContext getDefaultBlockDecodingContext();
|
||||
|
||||
void setIncludesMemStoreTS(boolean includesMemstoreTS);
|
||||
void setDataBlockEncoder(HFileDataBlockEncoder encoder);
|
||||
void setDataBlockEncoder(HFileDataBlockEncoder encoder, Configuration conf);
|
||||
|
||||
/**
|
||||
* To close the stream's socket. Note: This can be concurrently called from multiple threads and
|
||||
|
@ -1413,7 +1416,7 @@ public class HFileBlock implements Cacheable {
|
|||
private final Lock streamLock = new ReentrantLock();
|
||||
|
||||
FSReaderImpl(ReaderContext readerContext, HFileContext fileContext,
|
||||
ByteBuffAllocator allocator) throws IOException {
|
||||
ByteBuffAllocator allocator, Configuration conf) throws IOException {
|
||||
this.fileSize = readerContext.getFileSize();
|
||||
this.hfs = readerContext.getFileSystem();
|
||||
if (readerContext.getFilePath() != null) {
|
||||
|
@ -1426,7 +1429,7 @@ public class HFileBlock implements Cacheable {
|
|||
this.streamWrapper = readerContext.getInputStreamWrapper();
|
||||
// Older versions of HBase didn't support checksum.
|
||||
this.streamWrapper.prepareForBlockReader(!fileContext.isUseHBaseChecksum());
|
||||
defaultDecodingCtx = new HFileBlockDefaultDecodingContext(fileContext);
|
||||
defaultDecodingCtx = new HFileBlockDefaultDecodingContext(conf, fileContext);
|
||||
encodedBlockDecodingCtx = defaultDecodingCtx;
|
||||
}
|
||||
|
||||
|
@ -1790,8 +1793,8 @@ public class HFileBlock implements Cacheable {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void setDataBlockEncoder(HFileDataBlockEncoder encoder) {
|
||||
encodedBlockDecodingCtx = encoder.newDataBlockDecodingContext(this.fileContext);
|
||||
public void setDataBlockEncoder(HFileDataBlockEncoder encoder, Configuration conf) {
|
||||
encodedBlockDecodingCtx = encoder.newDataBlockDecodingContext(conf, fileContext);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -18,13 +18,13 @@ package org.apache.hadoop.hbase.io.hfile;
|
|||
|
||||
import java.io.DataOutputStream;
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.Cell;
|
||||
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
|
||||
import org.apache.hadoop.hbase.io.encoding.HFileBlockDecodingContext;
|
||||
import org.apache.hadoop.hbase.io.encoding.HFileBlockEncodingContext;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
|
||||
/**
|
||||
* Controls what kind of data block encoding is used. If data block encoding is
|
||||
|
@ -97,11 +97,12 @@ public interface HFileDataBlockEncoder {
|
|||
* encoding context should also perform compression if compressionAlgorithm is
|
||||
* valid.
|
||||
*
|
||||
* @param conf store configuration
|
||||
* @param headerBytes header bytes
|
||||
* @param fileContext HFile meta data
|
||||
* @return a new {@link HFileBlockEncodingContext} object
|
||||
*/
|
||||
HFileBlockEncodingContext newDataBlockEncodingContext(byte[] headerBytes,
|
||||
HFileBlockEncodingContext newDataBlockEncodingContext(Configuration conf, byte[] headerBytes,
|
||||
HFileContext fileContext);
|
||||
|
||||
/**
|
||||
|
@ -109,8 +110,10 @@ public interface HFileDataBlockEncoder {
|
|||
* decoding context should also do decompression if compressionAlgorithm
|
||||
* is valid.
|
||||
*
|
||||
* @param conf store configuration
|
||||
* @param fileContext - HFile meta data
|
||||
* @return a new {@link HFileBlockDecodingContext} object
|
||||
*/
|
||||
HFileBlockDecodingContext newDataBlockDecodingContext(HFileContext fileContext);
|
||||
HFileBlockDecodingContext newDataBlockDecodingContext(Configuration conf,
|
||||
HFileContext fileContext);
|
||||
}
|
||||
|
|
|
@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.io.hfile;
|
|||
import java.io.DataOutputStream;
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.Cell;
|
||||
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoder;
|
||||
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
|
||||
|
@ -108,22 +109,23 @@ public class HFileDataBlockEncoderImpl implements HFileDataBlockEncoder {
|
|||
}
|
||||
|
||||
@Override
|
||||
public HFileBlockEncodingContext newDataBlockEncodingContext(
|
||||
public HFileBlockEncodingContext newDataBlockEncodingContext(Configuration conf,
|
||||
byte[] dummyHeader, HFileContext fileContext) {
|
||||
DataBlockEncoder encoder = encoding.getEncoder();
|
||||
if (encoder != null) {
|
||||
return encoder.newDataBlockEncodingContext(encoding, dummyHeader, fileContext);
|
||||
return encoder.newDataBlockEncodingContext(conf, encoding, dummyHeader, fileContext);
|
||||
}
|
||||
return new HFileBlockDefaultEncodingContext(null, dummyHeader, fileContext);
|
||||
return new HFileBlockDefaultEncodingContext(conf, null, dummyHeader, fileContext);
|
||||
}
|
||||
|
||||
@Override
|
||||
public HFileBlockDecodingContext newDataBlockDecodingContext(HFileContext fileContext) {
|
||||
public HFileBlockDecodingContext newDataBlockDecodingContext(Configuration conf,
|
||||
HFileContext fileContext) {
|
||||
DataBlockEncoder encoder = encoding.getEncoder();
|
||||
if (encoder != null) {
|
||||
return encoder.newDataBlockDecodingContext(fileContext);
|
||||
return encoder.newDataBlockDecodingContext(conf, fileContext);
|
||||
}
|
||||
return new HFileBlockDefaultDecodingContext(fileContext);
|
||||
return new HFileBlockDefaultDecodingContext(conf, fileContext);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -319,7 +319,7 @@ public class HFilePrettyPrinter extends Configured implements Tool {
|
|||
|
||||
if (verbose || printKey || checkRow || checkFamily || printStats || checkMobIntegrity) {
|
||||
// scan over file and read key/value's and check if requested
|
||||
HFileScanner scanner = reader.getScanner(false, false, false);
|
||||
HFileScanner scanner = reader.getScanner(getConf(), false, false, false);
|
||||
fileStats = new KeyValueStatsCollector();
|
||||
boolean shouldScanKeysValues;
|
||||
if (this.isSeekToRow && !Bytes.equals(row, reader.getFirstRowKey().orElse(null))) {
|
||||
|
|
|
@ -141,9 +141,9 @@ public abstract class HFileReaderImpl implements HFile.Reader, Configurable {
|
|||
this.trailer = fileInfo.getTrailer();
|
||||
this.hfileContext = fileInfo.getHFileContext();
|
||||
this.fsBlockReader = new HFileBlock.FSReaderImpl(context, hfileContext,
|
||||
cacheConf.getByteBuffAllocator());
|
||||
cacheConf.getByteBuffAllocator(), conf);
|
||||
this.dataBlockEncoder = HFileDataBlockEncoderImpl.createFromFileInfo(fileInfo);
|
||||
fsBlockReader.setDataBlockEncoder(dataBlockEncoder);
|
||||
fsBlockReader.setDataBlockEncoder(dataBlockEncoder, conf);
|
||||
dataBlockIndexReader = fileInfo.getDataBlockIndexReader();
|
||||
metaBlockIndexReader = fileInfo.getMetaBlockIndexReader();
|
||||
}
|
||||
|
@ -256,7 +256,7 @@ public abstract class HFileReaderImpl implements HFile.Reader, Configurable {
|
|||
@Override
|
||||
public void setDataBlockEncoder(HFileDataBlockEncoder dataBlockEncoder) {
|
||||
this.dataBlockEncoder = dataBlockEncoder;
|
||||
this.fsBlockReader.setDataBlockEncoder(dataBlockEncoder);
|
||||
this.fsBlockReader.setDataBlockEncoder(dataBlockEncoder, conf);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -1444,11 +1444,11 @@ public abstract class HFileReaderImpl implements HFile.Reader, Configurable {
|
|||
private final DataBlockEncoder dataBlockEncoder;
|
||||
|
||||
public EncodedScanner(HFile.Reader reader, boolean cacheBlocks,
|
||||
boolean pread, boolean isCompaction, HFileContext meta) {
|
||||
boolean pread, boolean isCompaction, HFileContext meta, Configuration conf) {
|
||||
super(reader, cacheBlocks, pread, isCompaction);
|
||||
DataBlockEncoding encoding = reader.getDataBlockEncoding();
|
||||
dataBlockEncoder = encoding.getEncoder();
|
||||
decodingCtx = dataBlockEncoder.newDataBlockDecodingContext(meta);
|
||||
decodingCtx = dataBlockEncoder.newDataBlockDecodingContext(conf, meta);
|
||||
seeker = dataBlockEncoder.createSeeker(decodingCtx);
|
||||
}
|
||||
|
||||
|
@ -1636,16 +1636,17 @@ public abstract class HFileReaderImpl implements HFile.Reader, Configurable {
|
|||
* {@link HFileScanner#seekTo(Cell)} to position an start the read. There is
|
||||
* nothing to clean up in a Scanner. Letting go of your references to the
|
||||
* scanner is sufficient. NOTE: Do not use this overload of getScanner for
|
||||
* compactions. See {@link #getScanner(boolean, boolean, boolean)}
|
||||
* compactions. See {@link #getScanner(Configuration, boolean, boolean, boolean)}
|
||||
*
|
||||
* @param conf Store configuration.
|
||||
* @param cacheBlocks True if we should cache blocks read in by this scanner.
|
||||
* @param pread Use positional read rather than seek+read if true (pread is
|
||||
* better for random reads, seek+read is better scanning).
|
||||
* @return Scanner on this file.
|
||||
*/
|
||||
@Override
|
||||
public HFileScanner getScanner(boolean cacheBlocks, final boolean pread) {
|
||||
return getScanner(cacheBlocks, pread, false);
|
||||
public HFileScanner getScanner(Configuration conf, boolean cacheBlocks, final boolean pread) {
|
||||
return getScanner(conf, cacheBlocks, pread, false);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -1653,6 +1654,8 @@ public abstract class HFileReaderImpl implements HFile.Reader, Configurable {
|
|||
* {@link HFileScanner#seekTo(Cell)} to position an start the read. There is
|
||||
* nothing to clean up in a Scanner. Letting go of your references to the
|
||||
* scanner is sufficient.
|
||||
* @param conf
|
||||
* Store configuration.
|
||||
* @param cacheBlocks
|
||||
* True if we should cache blocks read in by this scanner.
|
||||
* @param pread
|
||||
|
@ -1663,10 +1666,10 @@ public abstract class HFileReaderImpl implements HFile.Reader, Configurable {
|
|||
* @return Scanner on this file.
|
||||
*/
|
||||
@Override
|
||||
public HFileScanner getScanner(boolean cacheBlocks, final boolean pread,
|
||||
public HFileScanner getScanner(Configuration conf, boolean cacheBlocks, final boolean pread,
|
||||
final boolean isCompaction) {
|
||||
if (dataBlockEncoder.useEncodedScanner()) {
|
||||
return new EncodedScanner(this, cacheBlocks, pread, isCompaction, this.hfileContext);
|
||||
return new EncodedScanner(this, cacheBlocks, pread, isCompaction, this.hfileContext, conf);
|
||||
}
|
||||
return new HFileScannerImpl(this, cacheBlocks, pread, isCompaction);
|
||||
}
|
||||
|
|
|
@ -298,7 +298,7 @@ public class HFileWriterImpl implements HFile.Writer {
|
|||
if (blockWriter != null) {
|
||||
throw new IllegalStateException("finishInit called twice");
|
||||
}
|
||||
blockWriter = new HFileBlock.Writer(blockEncoder, hFileContext,
|
||||
blockWriter = new HFileBlock.Writer(conf, blockEncoder, hFileContext,
|
||||
cacheConf.getByteBuffAllocator());
|
||||
// Data block index writer
|
||||
boolean cacheIndexesOnWrite = cacheConf.shouldCacheIndexesOnWrite();
|
||||
|
|
|
@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.io.hfile;
|
|||
import java.io.DataOutputStream;
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.Cell;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
|
||||
|
@ -81,14 +82,15 @@ public class NoOpDataBlockEncoder implements HFileDataBlockEncoder {
|
|||
}
|
||||
|
||||
@Override
|
||||
public HFileBlockEncodingContext newDataBlockEncodingContext(
|
||||
public HFileBlockEncodingContext newDataBlockEncodingContext(Configuration conf,
|
||||
byte[] dummyHeader, HFileContext meta) {
|
||||
return new HFileBlockDefaultEncodingContext(null, dummyHeader, meta);
|
||||
return new HFileBlockDefaultEncodingContext(conf, null, dummyHeader, meta);
|
||||
}
|
||||
|
||||
@Override
|
||||
public HFileBlockDecodingContext newDataBlockDecodingContext(HFileContext meta) {
|
||||
return new HFileBlockDefaultDecodingContext(meta);
|
||||
public HFileBlockDecodingContext newDataBlockDecodingContext(Configuration conf,
|
||||
HFileContext meta) {
|
||||
return new HFileBlockDefaultDecodingContext(conf, meta);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -136,7 +136,7 @@ public class HFileProcedurePrettyPrinter extends AbstractHBaseTool {
|
|||
out.println("Scanning -> " + file);
|
||||
FileSystem fs = file.getFileSystem(conf);
|
||||
try (HFile.Reader reader = HFile.createReader(fs, file, CacheConfig.DISABLED, true, conf);
|
||||
HFileScanner scanner = reader.getScanner(false, false, false)) {
|
||||
HFileScanner scanner = reader.getScanner(conf, false, false, false)) {
|
||||
if (procId != null) {
|
||||
if (scanner
|
||||
.seekTo(PrivateCellUtil.createFirstOnRow(Bytes.toBytes(procId.longValue()))) != -1) {
|
||||
|
|
|
@ -806,7 +806,7 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
|
|||
long verificationStartTime = EnvironmentEdgeManager.currentTime();
|
||||
LOG.info("Full verification started for bulk load hfile: {}", srcPath);
|
||||
Cell prevCell = null;
|
||||
HFileScanner scanner = reader.getScanner(false, false, false);
|
||||
HFileScanner scanner = reader.getScanner(conf, false, false, false);
|
||||
scanner.seekTo();
|
||||
do {
|
||||
Cell cell = scanner.getCell();
|
||||
|
|
|
@ -75,6 +75,7 @@ public class StoreFileReader {
|
|||
private KeyValue.KeyOnlyKeyValue lastBloomKeyOnlyKV = null;
|
||||
private boolean skipResetSeqId = true;
|
||||
private int prefixLength = -1;
|
||||
protected Configuration conf;
|
||||
|
||||
// Counter that is incremented every time a scanner is created on the
|
||||
// store file. It is decremented when the scan on the store file is
|
||||
|
@ -82,16 +83,18 @@ public class StoreFileReader {
|
|||
private final AtomicInteger refCount;
|
||||
private final ReaderContext context;
|
||||
|
||||
private StoreFileReader(HFile.Reader reader, AtomicInteger refCount, ReaderContext context) {
|
||||
private StoreFileReader(HFile.Reader reader, AtomicInteger refCount, ReaderContext context,
|
||||
Configuration conf) {
|
||||
this.reader = reader;
|
||||
bloomFilterType = BloomType.NONE;
|
||||
this.refCount = refCount;
|
||||
this.context = context;
|
||||
this.conf = conf;
|
||||
}
|
||||
|
||||
public StoreFileReader(ReaderContext context, HFileInfo fileInfo, CacheConfig cacheConf,
|
||||
AtomicInteger refCount, Configuration conf) throws IOException {
|
||||
this(HFile.createReader(context, fileInfo, cacheConf, conf), refCount, context);
|
||||
this(HFile.createReader(context, fileInfo, cacheConf, conf), refCount, context, conf);
|
||||
}
|
||||
|
||||
void copyFields(StoreFileReader storeFileReader) throws IOException {
|
||||
|
@ -205,7 +208,7 @@ public class StoreFileReader {
|
|||
@Deprecated
|
||||
public HFileScanner getScanner(boolean cacheBlocks, boolean pread,
|
||||
boolean isCompaction) {
|
||||
return reader.getScanner(cacheBlocks, pread, isCompaction);
|
||||
return reader.getScanner(conf, cacheBlocks, pread, isCompaction);
|
||||
}
|
||||
|
||||
public void close(boolean evictOnClose) throws IOException {
|
||||
|
|
|
@ -135,7 +135,7 @@ public class CompressionTest {
|
|||
Cell cc = null;
|
||||
HFile.Reader reader = HFile.createReader(fs, path, CacheConfig.DISABLED, true, conf);
|
||||
try {
|
||||
HFileScanner scanner = reader.getScanner(false, true);
|
||||
HFileScanner scanner = reader.getScanner(conf, false, true);
|
||||
scanner.seekTo(); // position to the start of file
|
||||
// Scanner does not do Cells yet. Do below for now till fixed.
|
||||
cc = scanner.getCell();
|
||||
|
|
|
@ -424,7 +424,7 @@ public class HFilePerformanceEvaluation {
|
|||
@Override
|
||||
void setUp() throws Exception {
|
||||
super.setUp();
|
||||
this.scanner = this.reader.getScanner(false, false);
|
||||
this.scanner = this.reader.getScanner(conf, false, false);
|
||||
this.scanner.seekTo();
|
||||
}
|
||||
|
||||
|
@ -456,7 +456,7 @@ public class HFilePerformanceEvaluation {
|
|||
|
||||
@Override
|
||||
void doRow(int i) throws Exception {
|
||||
HFileScanner scanner = this.reader.getScanner(false, true);
|
||||
HFileScanner scanner = this.reader.getScanner(conf, false, true);
|
||||
byte [] b = getRandomRow();
|
||||
if (scanner.seekTo(createCell(b)) < 0) {
|
||||
LOG.info("Not able to seekTo " + new String(b));
|
||||
|
@ -483,7 +483,7 @@ public class HFilePerformanceEvaluation {
|
|||
|
||||
@Override
|
||||
void doRow(int i) throws Exception {
|
||||
HFileScanner scanner = this.reader.getScanner(false, false);
|
||||
HFileScanner scanner = this.reader.getScanner(conf, false, false);
|
||||
byte [] b = getRandomRow();
|
||||
// System.out.println("Random row: " + new String(b));
|
||||
Cell c = createCell(b);
|
||||
|
@ -522,7 +522,7 @@ public class HFilePerformanceEvaluation {
|
|||
|
||||
@Override
|
||||
void doRow(int i) throws Exception {
|
||||
HFileScanner scanner = this.reader.getScanner(false, true);
|
||||
HFileScanner scanner = this.reader.getScanner(conf, false, true);
|
||||
byte[] gaussianRandomRowBytes = getGaussianRandomRowBytes();
|
||||
scanner.seekTo(createCell(gaussianRandomRowBytes));
|
||||
for (int ii = 0; ii < 30; ii++) {
|
||||
|
|
|
@ -36,38 +36,32 @@ import org.apache.hadoop.hbase.io.hfile.HFile;
|
|||
import org.apache.hadoop.hbase.io.hfile.HFileContext;
|
||||
import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
|
||||
import org.apache.hadoop.hbase.io.hfile.HFileScanner;
|
||||
import org.apache.hadoop.hbase.logging.Log4jUtils;
|
||||
import org.apache.hadoop.hbase.util.RedundantKVGenerator;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
public class HFileTestBase {
|
||||
|
||||
static {
|
||||
Log4jUtils.setLogLevel("org.apache.hadoop.hbase.io.compress", "TRACE");
|
||||
}
|
||||
|
||||
protected static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
|
||||
protected static final Logger LOG = LoggerFactory.getLogger(HFileTestBase.class);
|
||||
protected static final SecureRandom RNG = new SecureRandom();
|
||||
protected static FileSystem fs;
|
||||
protected static FileSystem FS;
|
||||
|
||||
public static void setUpBeforeClass() throws Exception {
|
||||
Configuration conf = TEST_UTIL.getConfiguration();
|
||||
// Disable block cache in this test.
|
||||
conf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0.0f);
|
||||
conf.setInt("hfile.format.version", 3);
|
||||
fs = FileSystem.get(conf);
|
||||
FS = FileSystem.get(conf);
|
||||
}
|
||||
|
||||
@SuppressWarnings("deprecation")
|
||||
public void doTest(Compression.Algorithm compression) throws Exception {
|
||||
public void doTest(Configuration conf, Path path, Compression.Algorithm compression)
|
||||
throws Exception {
|
||||
// Create 10000 random test KVs
|
||||
RedundantKVGenerator generator = new RedundantKVGenerator();
|
||||
List<KeyValue> testKvs = generator.generateTestKeyValues(10000);
|
||||
|
||||
// Iterate through data block encoding and compression combinations
|
||||
Configuration conf = TEST_UTIL.getConfiguration();
|
||||
CacheConfig cacheConf = new CacheConfig(conf);
|
||||
HFileContext fileContext = new HFileContextBuilder()
|
||||
.withBlockSize(4096) // small block
|
||||
|
@ -75,9 +69,7 @@ public class HFileTestBase {
|
|||
.build();
|
||||
// write a new test HFile
|
||||
LOG.info("Writing with " + fileContext);
|
||||
Path path = new Path(TEST_UTIL.getDataTestDir(),
|
||||
HBaseTestingUtility.getRandomUUID().toString() + ".hfile");
|
||||
FSDataOutputStream out = fs.create(path);
|
||||
FSDataOutputStream out = FS.create(path);
|
||||
HFile.Writer writer = HFile.getWriterFactory(conf, cacheConf)
|
||||
.withOutputStream(out)
|
||||
.withFileContext(fileContext)
|
||||
|
@ -95,9 +87,9 @@ public class HFileTestBase {
|
|||
LOG.info("Reading with " + fileContext);
|
||||
int i = 0;
|
||||
HFileScanner scanner = null;
|
||||
HFile.Reader reader = HFile.createReader(fs, path, cacheConf, true, conf);
|
||||
HFile.Reader reader = HFile.createReader(FS, path, cacheConf, true, conf);
|
||||
try {
|
||||
scanner = reader.getScanner(false, false);
|
||||
scanner = reader.getScanner(conf, false, false);
|
||||
assertTrue("Initial seekTo failed", scanner.seekTo());
|
||||
do {
|
||||
Cell kv = scanner.getCell();
|
||||
|
@ -114,9 +106,9 @@ public class HFileTestBase {
|
|||
|
||||
// Test random seeks with pread
|
||||
LOG.info("Random seeking with " + fileContext);
|
||||
reader = HFile.createReader(fs, path, cacheConf, true, conf);
|
||||
reader = HFile.createReader(FS, path, cacheConf, true, conf);
|
||||
try {
|
||||
scanner = reader.getScanner(false, true);
|
||||
scanner = reader.getScanner(conf, false, true);
|
||||
assertTrue("Initial seekTo failed", scanner.seekTo());
|
||||
for (i = 0; i < 100; i++) {
|
||||
KeyValue kv = testKvs.get(RNG.nextInt(testKvs.size()));
|
||||
|
|
|
@ -30,11 +30,14 @@ import java.util.ArrayList;
|
|||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.Random;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.ArrayBackedTag;
|
||||
import org.apache.hadoop.hbase.Cell;
|
||||
import org.apache.hadoop.hbase.CellComparatorImpl;
|
||||
import org.apache.hadoop.hbase.CellUtil;
|
||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.KeyValue;
|
||||
|
@ -82,9 +85,9 @@ public class TestDataBlockEncoders {
|
|||
+ DataBlockEncoding.ID_SIZE;
|
||||
static final byte[] HFILEBLOCK_DUMMY_HEADER = new byte[HConstants.HFILEBLOCK_HEADER_SIZE];
|
||||
|
||||
private RedundantKVGenerator generator = new RedundantKVGenerator();
|
||||
private Random randomizer = new Random(42L);
|
||||
|
||||
private final Configuration conf = HBaseConfiguration.create();
|
||||
private final RedundantKVGenerator generator = new RedundantKVGenerator();
|
||||
private final Random randomizer = new Random(42L);
|
||||
private final boolean includesMemstoreTS;
|
||||
private final boolean includesTags;
|
||||
private final boolean useOffheapData;
|
||||
|
@ -101,8 +104,8 @@ public class TestDataBlockEncoders {
|
|||
this.useOffheapData = useOffheapData;
|
||||
}
|
||||
|
||||
private HFileBlockEncodingContext getEncodingContext(Compression.Algorithm algo,
|
||||
DataBlockEncoding encoding) {
|
||||
private HFileBlockEncodingContext getEncodingContext(Configuration conf,
|
||||
Compression.Algorithm algo, DataBlockEncoding encoding) {
|
||||
DataBlockEncoder encoder = encoding.getEncoder();
|
||||
HFileContext meta = new HFileContextBuilder()
|
||||
.withHBaseCheckSum(false)
|
||||
|
@ -110,9 +113,9 @@ public class TestDataBlockEncoders {
|
|||
.withIncludesTags(includesTags)
|
||||
.withCompression(algo).build();
|
||||
if (encoder != null) {
|
||||
return encoder.newDataBlockEncodingContext(encoding, HFILEBLOCK_DUMMY_HEADER, meta);
|
||||
return encoder.newDataBlockEncodingContext(conf, encoding, HFILEBLOCK_DUMMY_HEADER, meta);
|
||||
} else {
|
||||
return new HFileBlockDefaultEncodingContext(encoding, HFILEBLOCK_DUMMY_HEADER, meta);
|
||||
return new HFileBlockDefaultEncodingContext(conf, encoding, HFILEBLOCK_DUMMY_HEADER, meta);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -199,7 +202,7 @@ public class TestDataBlockEncoders {
|
|||
}
|
||||
LOG.info("Encoder: " + encoder);
|
||||
ByteBuffer encodedBuffer = encodeKeyValues(encoding, sampleKv,
|
||||
getEncodingContext(Compression.Algorithm.NONE, encoding), this.useOffheapData);
|
||||
getEncodingContext(conf, Compression.Algorithm.NONE, encoding), this.useOffheapData);
|
||||
HFileContext meta = new HFileContextBuilder()
|
||||
.withHBaseCheckSum(false)
|
||||
.withIncludesMvcc(includesMemstoreTS)
|
||||
|
@ -207,7 +210,7 @@ public class TestDataBlockEncoders {
|
|||
.withCompression(Compression.Algorithm.NONE)
|
||||
.build();
|
||||
DataBlockEncoder.EncodedSeeker seeker =
|
||||
encoder.createSeeker(encoder.newDataBlockDecodingContext(meta));
|
||||
encoder.createSeeker(encoder.newDataBlockDecodingContext(conf, meta));
|
||||
seeker.setCurrentBuffer(new SingleByteBuff(encodedBuffer));
|
||||
encodedSeekers.add(seeker);
|
||||
}
|
||||
|
@ -272,7 +275,7 @@ public class TestDataBlockEncoders {
|
|||
}
|
||||
DataBlockEncoder encoder = encoding.getEncoder();
|
||||
ByteBuffer encodedBuffer = encodeKeyValues(encoding, sampleKv,
|
||||
getEncodingContext(Compression.Algorithm.NONE, encoding), this.useOffheapData);
|
||||
getEncodingContext(conf, Compression.Algorithm.NONE, encoding), this.useOffheapData);
|
||||
HFileContext meta = new HFileContextBuilder()
|
||||
.withHBaseCheckSum(false)
|
||||
.withIncludesMvcc(includesMemstoreTS)
|
||||
|
@ -280,7 +283,7 @@ public class TestDataBlockEncoders {
|
|||
.withCompression(Compression.Algorithm.NONE)
|
||||
.build();
|
||||
DataBlockEncoder.EncodedSeeker seeker =
|
||||
encoder.createSeeker(encoder.newDataBlockDecodingContext(meta));
|
||||
encoder.createSeeker(encoder.newDataBlockDecodingContext(conf, meta));
|
||||
seeker.setCurrentBuffer(new SingleByteBuff(encodedBuffer));
|
||||
int i = 0;
|
||||
do {
|
||||
|
@ -315,7 +318,7 @@ public class TestDataBlockEncoders {
|
|||
}
|
||||
DataBlockEncoder encoder = encoding.getEncoder();
|
||||
ByteBuffer encodedBuffer = encodeKeyValues(encoding, sampleKv,
|
||||
getEncodingContext(Compression.Algorithm.NONE, encoding), this.useOffheapData);
|
||||
getEncodingContext(conf, Compression.Algorithm.NONE, encoding), this.useOffheapData);
|
||||
Cell key = encoder.getFirstKeyCellInBlock(new SingleByteBuff(encodedBuffer));
|
||||
KeyValue firstKv = sampleKv.get(0);
|
||||
if (0 != PrivateCellUtil.compareKeyIgnoresMvcc(CellComparatorImpl.COMPARATOR, key, firstKv)) {
|
||||
|
@ -336,13 +339,13 @@ public class TestDataBlockEncoders {
|
|||
kvList.add(expectedKV);
|
||||
DataBlockEncoding encoding = DataBlockEncoding.ROW_INDEX_V1;
|
||||
DataBlockEncoder encoder = encoding.getEncoder();
|
||||
ByteBuffer encodedBuffer =
|
||||
encodeKeyValues(encoding, kvList, getEncodingContext(Algorithm.NONE, encoding), false);
|
||||
ByteBuffer encodedBuffer = encodeKeyValues(encoding, kvList,
|
||||
getEncodingContext(conf, Algorithm.NONE, encoding), false);
|
||||
HFileContext meta =
|
||||
new HFileContextBuilder().withHBaseCheckSum(false).withIncludesMvcc(includesMemstoreTS)
|
||||
.withIncludesTags(includesTags).withCompression(Compression.Algorithm.NONE).build();
|
||||
DataBlockEncoder.EncodedSeeker seeker =
|
||||
encoder.createSeeker(encoder.newDataBlockDecodingContext(meta));
|
||||
encoder.createSeeker(encoder.newDataBlockDecodingContext(conf, meta));
|
||||
seeker.setCurrentBuffer(new SingleByteBuff(encodedBuffer));
|
||||
Cell cell = seeker.getCell();
|
||||
Assert.assertEquals(expectedKV.getLength(), ((KeyValue) cell).getLength());
|
||||
|
@ -393,9 +396,9 @@ public class TestDataBlockEncoders {
|
|||
if (encoder == null) {
|
||||
continue;
|
||||
}
|
||||
HFileBlockEncodingContext encodingContext = new HFileBlockDefaultEncodingContext(encoding,
|
||||
HFILEBLOCK_DUMMY_HEADER, fileContext);
|
||||
|
||||
HFileBlockEncodingContext encodingContext =
|
||||
new HFileBlockDefaultEncodingContext(conf, encoding, HFILEBLOCK_DUMMY_HEADER,
|
||||
fileContext);
|
||||
ByteArrayOutputStream baos = new ByteArrayOutputStream();
|
||||
baos.write(HFILEBLOCK_DUMMY_HEADER);
|
||||
DataOutputStream dos = new DataOutputStream(baos);
|
||||
|
@ -441,7 +444,7 @@ public class TestDataBlockEncoders {
|
|||
HFileContext meta = new HFileContextBuilder().withHBaseCheckSum(false)
|
||||
.withIncludesMvcc(includesMemstoreTS).withIncludesTags(includesTags)
|
||||
.withCompression(Compression.Algorithm.NONE).build();
|
||||
actualDataset = encoder.decodeKeyValues(dis, encoder.newDataBlockDecodingContext(meta));
|
||||
actualDataset = encoder.decodeKeyValues(dis, encoder.newDataBlockDecodingContext(conf, meta));
|
||||
actualDataset.rewind();
|
||||
|
||||
// this is because in case of prefix tree the decoded stream will not have
|
||||
|
|
|
@ -24,9 +24,12 @@ import java.nio.ByteBuffer;
|
|||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.Cell;
|
||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||
import org.apache.hadoop.hbase.HBaseCommonTestingUtility;
|
||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.KeyValue;
|
||||
import org.apache.hadoop.hbase.PrivateCellUtil;
|
||||
|
@ -54,6 +57,7 @@ public class TestSeekToBlockWithEncoders {
|
|||
|
||||
static final byte[] HFILEBLOCK_DUMMY_HEADER = new byte[HConstants.HFILEBLOCK_HEADER_SIZE];
|
||||
private final boolean useOffheapData;
|
||||
private final Configuration conf = HBaseConfiguration.create();
|
||||
|
||||
@Parameters
|
||||
public static Collection<Object[]> parameters() {
|
||||
|
@ -285,12 +289,12 @@ public class TestSeekToBlockWithEncoders {
|
|||
HFileContext meta = new HFileContextBuilder().withHBaseCheckSum(false)
|
||||
.withIncludesMvcc(false).withIncludesTags(false)
|
||||
.withCompression(Compression.Algorithm.NONE).build();
|
||||
HFileBlockEncodingContext encodingContext = encoder.newDataBlockEncodingContext(encoding,
|
||||
HFILEBLOCK_DUMMY_HEADER, meta);
|
||||
HFileBlockEncodingContext encodingContext = encoder.newDataBlockEncodingContext(conf,
|
||||
encoding, HFILEBLOCK_DUMMY_HEADER, meta);
|
||||
ByteBuffer encodedBuffer = TestDataBlockEncoders.encodeKeyValues(encoding, kvs,
|
||||
encodingContext, this.useOffheapData);
|
||||
DataBlockEncoder.EncodedSeeker seeker =
|
||||
encoder.createSeeker(encoder.newDataBlockDecodingContext(meta));
|
||||
encoder.createSeeker(encoder.newDataBlockDecodingContext(conf, meta));
|
||||
seeker.setCurrentBuffer(new SingleByteBuff(encodedBuffer));
|
||||
encodedSeekers.add(seeker);
|
||||
}
|
||||
|
|
|
@ -283,7 +283,7 @@ public class TestCacheOnWrite {
|
|||
.withIncludesTags(useTags).build();
|
||||
final boolean cacheBlocks = false;
|
||||
final boolean pread = false;
|
||||
HFileScanner scanner = reader.getScanner(cacheBlocks, pread);
|
||||
HFileScanner scanner = reader.getScanner(conf, cacheBlocks, pread);
|
||||
assertTrue(testDescription, scanner.seekTo());
|
||||
|
||||
long offset = 0;
|
||||
|
|
|
@ -30,6 +30,7 @@ import java.io.IOException;
|
|||
import java.nio.BufferUnderflowException;
|
||||
import java.nio.ByteBuffer;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FSDataInputStream;
|
||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
|
@ -85,7 +86,7 @@ public class TestChecksum {
|
|||
Path path = new Path(TEST_UTIL.getDataTestDir(), "default_checksum");
|
||||
FSDataOutputStream os = fs.create(path);
|
||||
HFileContext meta = new HFileContextBuilder().build();
|
||||
HFileBlock.Writer hbw = new HFileBlock.Writer(null, meta);
|
||||
HFileBlock.Writer hbw = new HFileBlock.Writer(TEST_UTIL.getConfiguration(), null, meta);
|
||||
DataOutputStream dos = hbw.startWriting(BlockType.DATA);
|
||||
for (int i = 0; i < 1000; ++i)
|
||||
dos.writeInt(i);
|
||||
|
@ -105,7 +106,7 @@ public class TestChecksum {
|
|||
.withFilePath(path)
|
||||
.build();
|
||||
HFileBlock.FSReader hbr = new HFileBlock.FSReaderImpl(context,
|
||||
meta, ByteBuffAllocator.HEAP);
|
||||
meta, ByteBuffAllocator.HEAP, TEST_UTIL.getConfiguration());
|
||||
HFileBlock b = hbr.readBlockData(0, -1, false, false, true);
|
||||
assertTrue(!b.isSharedMem());
|
||||
assertEquals(b.getChecksumType(), ChecksumType.getDefaultChecksumType().getCode());
|
||||
|
@ -137,7 +138,7 @@ public class TestChecksum {
|
|||
HFileContext meta = new HFileContextBuilder()
|
||||
.withChecksumType(ckt)
|
||||
.build();
|
||||
HFileBlock.Writer hbw = new HFileBlock.Writer(null, meta);
|
||||
HFileBlock.Writer hbw = new HFileBlock.Writer(TEST_UTIL.getConfiguration(), null, meta);
|
||||
DataOutputStream dos = hbw.startWriting(BlockType.DATA);
|
||||
for (int i = 0; i < intCount; ++i) {
|
||||
dos.writeInt(i);
|
||||
|
@ -158,7 +159,7 @@ public class TestChecksum {
|
|||
.withFilePath(path)
|
||||
.build();
|
||||
HFileBlock.FSReader hbr = new HFileBlock.FSReaderImpl(context,
|
||||
meta, ByteBuffAllocator.HEAP);
|
||||
meta, ByteBuffAllocator.HEAP, TEST_UTIL.getConfiguration());
|
||||
HFileBlock b = hbr.readBlockData(0, -1, false, false, true);
|
||||
assertTrue(!b.isSharedMem());
|
||||
|
||||
|
@ -206,7 +207,7 @@ public class TestChecksum {
|
|||
.withIncludesTags(useTags)
|
||||
.withBytesPerCheckSum(HFile.DEFAULT_BYTES_PER_CHECKSUM)
|
||||
.build();
|
||||
HFileBlock.Writer hbw = new HFileBlock.Writer(null, meta);
|
||||
HFileBlock.Writer hbw = new HFileBlock.Writer(TEST_UTIL.getConfiguration(), null, meta);
|
||||
long totalSize = 0;
|
||||
for (int blockId = 0; blockId < 2; ++blockId) {
|
||||
DataOutputStream dos = hbw.startWriting(BlockType.DATA);
|
||||
|
@ -234,7 +235,8 @@ public class TestChecksum {
|
|||
.withFileSystem(fs)
|
||||
.withFilePath(path)
|
||||
.build();
|
||||
HFileBlock.FSReader hbr = new CorruptedFSReaderImpl(context, meta);
|
||||
HFileBlock.FSReader hbr = new CorruptedFSReaderImpl(context, meta,
|
||||
TEST_UTIL.getConfiguration());
|
||||
HFileBlock b = hbr.readBlockData(0, -1, pread, false, true);
|
||||
b.sanityCheck();
|
||||
assertEquals(4936, b.getUncompressedSizeWithoutHeader());
|
||||
|
@ -276,7 +278,8 @@ public class TestChecksum {
|
|||
// Now, use a completely new reader. Switch off hbase checksums in
|
||||
// the configuration. In this case, we should not detect
|
||||
// any retries within hbase.
|
||||
HFileSystem newfs = new HFileSystem(TEST_UTIL.getConfiguration(), false);
|
||||
Configuration conf = TEST_UTIL.getConfiguration();
|
||||
HFileSystem newfs = new HFileSystem(conf, false);
|
||||
assertEquals(false, newfs.useHBaseChecksum());
|
||||
is = new FSDataInputStreamWrapper(newfs, path);
|
||||
context = new ReaderContextBuilder()
|
||||
|
@ -285,7 +288,7 @@ public class TestChecksum {
|
|||
.withFileSystem(newfs)
|
||||
.withFilePath(path)
|
||||
.build();
|
||||
hbr = new CorruptedFSReaderImpl(context, meta);
|
||||
hbr = new CorruptedFSReaderImpl(context, meta, conf);
|
||||
b = hbr.readBlockData(0, -1, pread, false, true);
|
||||
is.close();
|
||||
b.sanityCheck();
|
||||
|
@ -329,7 +332,7 @@ public class TestChecksum {
|
|||
.withHBaseCheckSum(true)
|
||||
.withBytesPerCheckSum(bytesPerChecksum)
|
||||
.build();
|
||||
HFileBlock.Writer hbw = new HFileBlock.Writer(null,
|
||||
HFileBlock.Writer hbw = new HFileBlock.Writer(TEST_UTIL.getConfiguration(), null,
|
||||
meta);
|
||||
|
||||
// write one block. The block has data
|
||||
|
@ -373,7 +376,8 @@ public class TestChecksum {
|
|||
.withFilePath(path)
|
||||
.build();
|
||||
HFileBlock.FSReader hbr =
|
||||
new HFileBlock.FSReaderImpl(context, meta, ByteBuffAllocator.HEAP);
|
||||
new HFileBlock.FSReaderImpl(context, meta, ByteBuffAllocator.HEAP,
|
||||
TEST_UTIL.getConfiguration());
|
||||
HFileBlock b = hbr.readBlockData(0, -1, pread, false, true);
|
||||
assertTrue(b.getBufferReadOnly() instanceof SingleByteBuff);
|
||||
is.close();
|
||||
|
@ -413,8 +417,9 @@ public class TestChecksum {
|
|||
*/
|
||||
boolean corruptDataStream = false;
|
||||
|
||||
public CorruptedFSReaderImpl(ReaderContext context, HFileContext meta) throws IOException {
|
||||
super(context, meta, ByteBuffAllocator.HEAP);
|
||||
public CorruptedFSReaderImpl(ReaderContext context, HFileContext meta, Configuration conf)
|
||||
throws IOException {
|
||||
super(context, meta, ByteBuffAllocator.HEAP, conf);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -528,7 +528,7 @@ public class TestHFile {
|
|||
System.out.println(cacheConf.toString());
|
||||
// Load up the index.
|
||||
// Get a scanner that caches and that does not use pread.
|
||||
HFileScanner scanner = reader.getScanner(true, false);
|
||||
HFileScanner scanner = reader.getScanner(conf, true, false);
|
||||
// Align scanner at start of the file.
|
||||
scanner.seekTo();
|
||||
readAllRecords(scanner);
|
||||
|
@ -617,7 +617,7 @@ public class TestHFile {
|
|||
ReaderContext context = new ReaderContextBuilder().withFileSystemAndPath(fs, mFile).build();
|
||||
Reader reader = createReaderFromStream(context, cacheConf, conf);
|
||||
// No data -- this should return false.
|
||||
assertFalse(reader.getScanner(false, false).seekTo());
|
||||
assertFalse(reader.getScanner(conf, false, false).seekTo());
|
||||
someReadingWithMetaBlock(reader);
|
||||
fs.delete(mFile, true);
|
||||
reader.close();
|
||||
|
|
|
@ -269,7 +269,7 @@ public class TestHFileBlock {
|
|||
.withIncludesTags(includesTag)
|
||||
.withBytesPerCheckSum(HFile.DEFAULT_BYTES_PER_CHECKSUM)
|
||||
.build();
|
||||
HFileBlock.Writer hbw = new HFileBlock.Writer(null, meta);
|
||||
HFileBlock.Writer hbw = new HFileBlock.Writer(TEST_UTIL.getConfiguration(), null, meta);
|
||||
DataOutputStream dos = hbw.startWriting(blockType);
|
||||
writeTestBlockContents(dos);
|
||||
dos.flush();
|
||||
|
@ -351,8 +351,9 @@ public class TestHFileBlock {
|
|||
}
|
||||
|
||||
protected void testReaderV2Internals() throws IOException {
|
||||
if(includesTag) {
|
||||
TEST_UTIL.getConfiguration().setInt("hfile.format.version", 3);
|
||||
final Configuration conf = TEST_UTIL.getConfiguration();
|
||||
if (includesTag) {
|
||||
conf.setInt("hfile.format.version", 3);
|
||||
}
|
||||
for (Compression.Algorithm algo : COMPRESSION_ALGORITHMS) {
|
||||
for (boolean pread : new boolean[] { false, true }) {
|
||||
|
@ -367,7 +368,7 @@ public class TestHFileBlock {
|
|||
.withIncludesTags(includesTag)
|
||||
.withBytesPerCheckSum(HFile.DEFAULT_BYTES_PER_CHECKSUM)
|
||||
.build();
|
||||
HFileBlock.Writer hbw = new HFileBlock.Writer(null,
|
||||
HFileBlock.Writer hbw = new HFileBlock.Writer(conf, null,
|
||||
meta);
|
||||
long totalSize = 0;
|
||||
for (int blockId = 0; blockId < 2; ++blockId) {
|
||||
|
@ -391,7 +392,8 @@ public class TestHFileBlock {
|
|||
.withFilePath(path)
|
||||
.withFileSystem(fs)
|
||||
.build();
|
||||
HFileBlock.FSReader hbr = new HFileBlock.FSReaderImpl(context, meta, alloc);
|
||||
HFileBlock.FSReader hbr = new HFileBlock.FSReaderImpl(context, meta, alloc,
|
||||
TEST_UTIL.getConfiguration());
|
||||
HFileBlock b = hbr.readBlockData(0, -1, pread, false, true);
|
||||
is.close();
|
||||
assertEquals(0, HFile.getAndResetChecksumFailuresCount());
|
||||
|
@ -410,7 +412,8 @@ public class TestHFileBlock {
|
|||
.withFilePath(path)
|
||||
.withFileSystem(fs)
|
||||
.build();
|
||||
hbr = new HFileBlock.FSReaderImpl(readerContext, meta, alloc);
|
||||
hbr = new HFileBlock.FSReaderImpl(readerContext, meta, alloc,
|
||||
TEST_UTIL.getConfiguration());
|
||||
b = hbr.readBlockData(0,
|
||||
2173 + HConstants.HFILEBLOCK_HEADER_SIZE + b.totalChecksumBytes(), pread, false, true);
|
||||
assertEquals(expected, b);
|
||||
|
@ -444,8 +447,9 @@ public class TestHFileBlock {
|
|||
|
||||
private void testInternals() throws IOException {
|
||||
final int numBlocks = 5;
|
||||
final Configuration conf = TEST_UTIL.getConfiguration();
|
||||
if(includesTag) {
|
||||
TEST_UTIL.getConfiguration().setInt("hfile.format.version", 3);
|
||||
conf.setInt("hfile.format.version", 3);
|
||||
}
|
||||
for (Compression.Algorithm algo : COMPRESSION_ALGORITHMS) {
|
||||
for (boolean pread : new boolean[] { false, true }) {
|
||||
|
@ -463,7 +467,7 @@ public class TestHFileBlock {
|
|||
.withIncludesTags(includesTag)
|
||||
.withBytesPerCheckSum(HFile.DEFAULT_BYTES_PER_CHECKSUM)
|
||||
.build();
|
||||
HFileBlock.Writer hbw = new HFileBlock.Writer(dataBlockEncoder, meta);
|
||||
HFileBlock.Writer hbw = new HFileBlock.Writer(conf, dataBlockEncoder, meta);
|
||||
long totalSize = 0;
|
||||
final List<Integer> encodedSizes = new ArrayList<>();
|
||||
final List<ByteBuff> encodedBlocks = new ArrayList<>();
|
||||
|
@ -500,8 +504,8 @@ public class TestHFileBlock {
|
|||
.withFileSystem(fs)
|
||||
.build();
|
||||
HFileBlock.FSReaderImpl hbr =
|
||||
new HFileBlock.FSReaderImpl(context, meta, alloc);
|
||||
hbr.setDataBlockEncoder(dataBlockEncoder);
|
||||
new HFileBlock.FSReaderImpl(context, meta, alloc, conf);
|
||||
hbr.setDataBlockEncoder(dataBlockEncoder, conf);
|
||||
hbr.setIncludesMemStoreTS(includesMemstoreTS);
|
||||
HFileBlock blockFromHFile, blockUnpacked;
|
||||
int pos = 0;
|
||||
|
@ -609,6 +613,7 @@ public class TestHFileBlock {
|
|||
|
||||
protected void testPreviousOffsetInternals() throws IOException {
|
||||
// TODO: parameterize these nested loops.
|
||||
Configuration conf = TEST_UTIL.getConfiguration();
|
||||
for (Compression.Algorithm algo : COMPRESSION_ALGORITHMS) {
|
||||
for (boolean pread : BOOLEAN_VALUES) {
|
||||
for (boolean cacheOnWrite : BOOLEAN_VALUES) {
|
||||
|
@ -620,8 +625,8 @@ public class TestHFileBlock {
|
|||
List<Long> expectedPrevOffsets = new ArrayList<>();
|
||||
List<BlockType> expectedTypes = new ArrayList<>();
|
||||
List<ByteBuffer> expectedContents = cacheOnWrite ? new ArrayList<>() : null;
|
||||
long totalSize = writeBlocks(rand, algo, path, expectedOffsets,
|
||||
expectedPrevOffsets, expectedTypes, expectedContents);
|
||||
long totalSize = writeBlocks(TEST_UTIL.getConfiguration(), rand, algo, path,
|
||||
expectedOffsets, expectedPrevOffsets, expectedTypes, expectedContents);
|
||||
|
||||
FSDataInputStream is = fs.open(path);
|
||||
HFileContext meta = new HFileContextBuilder()
|
||||
|
@ -635,8 +640,7 @@ public class TestHFileBlock {
|
|||
.withFilePath(path)
|
||||
.withFileSystem(fs)
|
||||
.build();
|
||||
HFileBlock.FSReader hbr =
|
||||
new HFileBlock.FSReaderImpl(context, meta, alloc);
|
||||
HFileBlock.FSReader hbr = new HFileBlock.FSReaderImpl(context, meta, alloc, conf);
|
||||
long curOffset = 0;
|
||||
for (int i = 0; i < NUM_TEST_BLOCKS; ++i) {
|
||||
if (!pread) {
|
||||
|
@ -819,12 +823,14 @@ public class TestHFileBlock {
|
|||
|
||||
protected void testConcurrentReadingInternals() throws IOException,
|
||||
InterruptedException, ExecutionException {
|
||||
Configuration conf = TEST_UTIL.getConfiguration();
|
||||
for (Compression.Algorithm compressAlgo : COMPRESSION_ALGORITHMS) {
|
||||
Path path = new Path(TEST_UTIL.getDataTestDir(), "concurrent_reading");
|
||||
Random rand = defaultRandom();
|
||||
List<Long> offsets = new ArrayList<>();
|
||||
List<BlockType> types = new ArrayList<>();
|
||||
writeBlocks(rand, compressAlgo, path, offsets, null, types, null);
|
||||
writeBlocks(TEST_UTIL.getConfiguration(), rand, compressAlgo, path, offsets, null,
|
||||
types, null);
|
||||
FSDataInputStream is = fs.open(path);
|
||||
long fileSize = fs.getFileStatus(path).getLen();
|
||||
HFileContext meta = new HFileContextBuilder()
|
||||
|
@ -839,7 +845,7 @@ public class TestHFileBlock {
|
|||
.withFilePath(path)
|
||||
.withFileSystem(fs)
|
||||
.build();
|
||||
HFileBlock.FSReader hbr = new HFileBlock.FSReaderImpl(context, meta, alloc);
|
||||
HFileBlock.FSReader hbr = new HFileBlock.FSReaderImpl(context, meta, alloc, conf);
|
||||
|
||||
Executor exec = Executors.newFixedThreadPool(NUM_READER_THREADS);
|
||||
ExecutorCompletionService<Boolean> ecs = new ExecutorCompletionService<>(exec);
|
||||
|
@ -862,7 +868,7 @@ public class TestHFileBlock {
|
|||
}
|
||||
}
|
||||
|
||||
private long writeBlocks(Random rand, Compression.Algorithm compressAlgo,
|
||||
private long writeBlocks(Configuration conf, Random rand, Compression.Algorithm compressAlgo,
|
||||
Path path, List<Long> expectedOffsets, List<Long> expectedPrevOffsets,
|
||||
List<BlockType> expectedTypes, List<ByteBuffer> expectedContents
|
||||
) throws IOException {
|
||||
|
@ -875,7 +881,7 @@ public class TestHFileBlock {
|
|||
.withCompression(compressAlgo)
|
||||
.withBytesPerCheckSum(HFile.DEFAULT_BYTES_PER_CHECKSUM)
|
||||
.build();
|
||||
HFileBlock.Writer hbw = new HFileBlock.Writer(null, meta);
|
||||
HFileBlock.Writer hbw = new HFileBlock.Writer(conf, null, meta);
|
||||
Map<BlockType, Long> prevOffsetByType = new HashMap<>();
|
||||
long totalSize = 0;
|
||||
for (int i = 0; i < NUM_TEST_BLOCKS; ++i) {
|
||||
|
|
|
@ -210,7 +210,7 @@ public class TestHFileBlockIndex {
|
|||
.build();
|
||||
ReaderContext context = new ReaderContextBuilder().withFileSystemAndPath(fs, path).build();
|
||||
HFileBlock.FSReader blockReader = new HFileBlock.FSReaderImpl(context, meta,
|
||||
ByteBuffAllocator.HEAP);
|
||||
ByteBuffAllocator.HEAP, conf);
|
||||
|
||||
BlockReaderWrapper brw = new BlockReaderWrapper(blockReader);
|
||||
HFileBlockIndex.BlockIndexReader indexReader =
|
||||
|
@ -267,7 +267,7 @@ public class TestHFileBlockIndex {
|
|||
.withCompression(compr)
|
||||
.withBytesPerCheckSum(HFile.DEFAULT_BYTES_PER_CHECKSUM)
|
||||
.build();
|
||||
HFileBlock.Writer hbw = new HFileBlock.Writer(null,
|
||||
HFileBlock.Writer hbw = new HFileBlock.Writer(TEST_UTIL.getConfiguration(), null,
|
||||
meta);
|
||||
FSDataOutputStream outputStream = fs.create(path);
|
||||
HFileBlockIndex.BlockIndexWriter biw =
|
||||
|
@ -647,7 +647,7 @@ public class TestHFileBlockIndex {
|
|||
LOG.info("Last key: " + Bytes.toStringBinary(keys[NUM_KV - 1]));
|
||||
|
||||
for (boolean pread : new boolean[] { false, true }) {
|
||||
HFileScanner scanner = reader.getScanner(true, pread);
|
||||
HFileScanner scanner = reader.getScanner(conf, true, pread);
|
||||
for (int i = 0; i < NUM_KV; ++i) {
|
||||
checkSeekTo(keys, scanner, i);
|
||||
checkKeyValue("i=" + i, keys[i], values[i],
|
||||
|
@ -770,7 +770,7 @@ public class TestHFileBlockIndex {
|
|||
|
||||
HFile.Reader reader = HFile.createReader(fs, hfPath, cacheConf, true, conf);
|
||||
// Scanner doesn't do Cells yet. Fix.
|
||||
HFileScanner scanner = reader.getScanner(true, true);
|
||||
HFileScanner scanner = reader.getScanner(conf, true, true);
|
||||
for (int i = 0; i < keys.size(); ++i) {
|
||||
scanner.seekTo(CellUtil.createCell(keys.get(i)));
|
||||
}
|
||||
|
|
|
@ -27,8 +27,11 @@ import java.nio.ByteBuffer;
|
|||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.Cell;
|
||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.KeyValue;
|
||||
import org.apache.hadoop.hbase.io.ByteArrayOutputStream;
|
||||
|
@ -58,8 +61,9 @@ public class TestHFileDataBlockEncoder {
|
|||
public static final HBaseClassTestRule CLASS_RULE =
|
||||
HBaseClassTestRule.forClass(TestHFileDataBlockEncoder.class);
|
||||
|
||||
private final Configuration conf = HBaseConfiguration.create();
|
||||
private final RedundantKVGenerator generator = new RedundantKVGenerator();
|
||||
private HFileDataBlockEncoder blockEncoder;
|
||||
private RedundantKVGenerator generator = new RedundantKVGenerator();
|
||||
private boolean includesMemstoreTS;
|
||||
|
||||
/**
|
||||
|
@ -87,7 +91,7 @@ public class TestHFileDataBlockEncoder {
|
|||
private void testEncodingWithCacheInternals(boolean useTag) throws IOException {
|
||||
List<KeyValue> kvs = generator.generateTestKeyValues(60, useTag);
|
||||
HFileBlock block = getSampleHFileBlock(kvs, useTag);
|
||||
HFileBlock cacheBlock = createBlockOnDisk(kvs, block, useTag);
|
||||
HFileBlock cacheBlock = createBlockOnDisk(conf, kvs, block, useTag);
|
||||
|
||||
LruBlockCache blockCache =
|
||||
new LruBlockCache(8 * 1024 * 1024, 32 * 1024);
|
||||
|
@ -135,7 +139,7 @@ public class TestHFileDataBlockEncoder {
|
|||
.build();
|
||||
HFileBlock block = new HFileBlock(BlockType.DATA, size, size, -1, ByteBuff.wrap(buf),
|
||||
HFileBlock.FILL_HEADER, 0, 0, -1, hfileContext, ByteBuffAllocator.HEAP);
|
||||
HFileBlock cacheBlock = createBlockOnDisk(kvs, block, useTags);
|
||||
HFileBlock cacheBlock = createBlockOnDisk(conf, kvs, block, useTags);
|
||||
assertEquals(headerSize, cacheBlock.getDummyHeaderForVersion().length);
|
||||
}
|
||||
|
||||
|
@ -162,7 +166,7 @@ public class TestHFileDataBlockEncoder {
|
|||
HFileContext meta = new HFileContextBuilder().withIncludesMvcc(includesMemstoreTS)
|
||||
.withIncludesTags(true).withHBaseCheckSum(true).withCompression(Algorithm.NONE)
|
||||
.withBlockSize(0).withChecksumType(ChecksumType.NULL).build();
|
||||
writeBlock(kvs, meta, true);
|
||||
writeBlock(conf, kvs, meta, true);
|
||||
} catch (IllegalArgumentException e) {
|
||||
fail("No exception should have been thrown");
|
||||
}
|
||||
|
@ -172,7 +176,7 @@ public class TestHFileDataBlockEncoder {
|
|||
// usually we have just block without headers, but don't complicate that
|
||||
List<KeyValue> kvs = generator.generateTestKeyValues(60, useTag);
|
||||
HFileBlock block = getSampleHFileBlock(kvs, useTag);
|
||||
HFileBlock blockOnDisk = createBlockOnDisk(kvs, block, useTag);
|
||||
HFileBlock blockOnDisk = createBlockOnDisk(conf, kvs, block, useTag);
|
||||
|
||||
if (blockEncoder.getDataBlockEncoding() !=
|
||||
DataBlockEncoding.NONE) {
|
||||
|
@ -204,10 +208,10 @@ public class TestHFileDataBlockEncoder {
|
|||
return b;
|
||||
}
|
||||
|
||||
private HFileBlock createBlockOnDisk(List<KeyValue> kvs, HFileBlock block, boolean useTags)
|
||||
throws IOException {
|
||||
private HFileBlock createBlockOnDisk(Configuration conf, List<KeyValue> kvs, HFileBlock block,
|
||||
boolean useTags) throws IOException {
|
||||
int size;
|
||||
HFileBlockEncodingContext context = new HFileBlockDefaultEncodingContext(
|
||||
HFileBlockEncodingContext context = new HFileBlockDefaultEncodingContext(conf,
|
||||
blockEncoder.getDataBlockEncoding(), HConstants.HFILEBLOCK_DUMMY_HEADER,
|
||||
block.getHFileContext());
|
||||
|
||||
|
@ -226,9 +230,9 @@ public class TestHFileDataBlockEncoder {
|
|||
block.getOnDiskDataSizeWithHeader(), -1, block.getHFileContext(), ByteBuffAllocator.HEAP);
|
||||
}
|
||||
|
||||
private void writeBlock(List<Cell> kvs, HFileContext fileContext, boolean useTags)
|
||||
throws IOException {
|
||||
HFileBlockEncodingContext context = new HFileBlockDefaultEncodingContext(
|
||||
private void writeBlock(Configuration conf, List<Cell> kvs, HFileContext fileContext,
|
||||
boolean useTags) throws IOException {
|
||||
HFileBlockEncodingContext context = new HFileBlockDefaultEncodingContext(conf,
|
||||
blockEncoder.getDataBlockEncoding(), HConstants.HFILEBLOCK_DUMMY_HEADER,
|
||||
fileContext);
|
||||
|
||||
|
|
|
@ -93,9 +93,9 @@ public class TestHFileEncryption {
|
|||
cryptoContext.setKey(key);
|
||||
}
|
||||
|
||||
private int writeBlock(FSDataOutputStream os, HFileContext fileContext, int size)
|
||||
throws IOException {
|
||||
HFileBlock.Writer hbw = new HFileBlock.Writer(null, fileContext);
|
||||
private int writeBlock(Configuration conf, FSDataOutputStream os, HFileContext fileContext,
|
||||
int size) throws IOException {
|
||||
HFileBlock.Writer hbw = new HFileBlock.Writer(conf, null, fileContext);
|
||||
DataOutputStream dos = hbw.startWriting(BlockType.DATA);
|
||||
for (int j = 0; j < size; j++) {
|
||||
dos.writeInt(j);
|
||||
|
@ -148,7 +148,7 @@ public class TestHFileEncryption {
|
|||
FSDataOutputStream os = fs.create(path);
|
||||
try {
|
||||
for (int i = 0; i < blocks; i++) {
|
||||
totalSize += writeBlock(os, fileContext, blockSizes[i]);
|
||||
totalSize += writeBlock(TEST_UTIL.getConfiguration(), os, fileContext, blockSizes[i]);
|
||||
}
|
||||
} finally {
|
||||
os.close();
|
||||
|
@ -161,7 +161,7 @@ public class TestHFileEncryption {
|
|||
.withFileSize(totalSize).build();
|
||||
try {
|
||||
HFileBlock.FSReaderImpl hbr = new HFileBlock.FSReaderImpl(context, fileContext,
|
||||
ByteBuffAllocator.HEAP);
|
||||
ByteBuffAllocator.HEAP, TEST_UTIL.getConfiguration());
|
||||
long pos = 0;
|
||||
for (int i = 0; i < blocks; i++) {
|
||||
pos += readAndVerifyBlock(pos, fileContext, hbr, blockSizes[i]);
|
||||
|
@ -252,7 +252,7 @@ public class TestHFileEncryption {
|
|||
try {
|
||||
FixedFileTrailer trailer = reader.getTrailer();
|
||||
assertNotNull(trailer.getEncryptionKey());
|
||||
scanner = reader.getScanner(false, false);
|
||||
scanner = reader.getScanner(conf, false, false);
|
||||
assertTrue("Initial seekTo failed", scanner.seekTo());
|
||||
do {
|
||||
Cell kv = scanner.getCell();
|
||||
|
@ -271,7 +271,7 @@ public class TestHFileEncryption {
|
|||
LOG.info("Random seeking with " + fileContext);
|
||||
reader = HFile.createReader(fs, path, cacheConf, true, conf);
|
||||
try {
|
||||
scanner = reader.getScanner(false, true);
|
||||
scanner = reader.getScanner(conf, false, true);
|
||||
assertTrue("Initial seekTo failed", scanner.seekTo());
|
||||
for (i = 0; i < 100; i++) {
|
||||
KeyValue kv = testKvs.get(RNG.nextInt(testKvs.size()));
|
||||
|
|
|
@ -86,7 +86,7 @@ public class TestHFileInlineToRootChunkConversion {
|
|||
|
||||
HFile.Reader reader = HFile.createReader(fs, hfPath, cacheConf, true, conf);
|
||||
// Scanner doesn't do Cells yet. Fix.
|
||||
HFileScanner scanner = reader.getScanner(true, true);
|
||||
HFileScanner scanner = reader.getScanner(conf, true, true);
|
||||
for (int i = 0; i < keys.size(); ++i) {
|
||||
scanner.seekTo(CellUtil.createCell(keys.get(i)));
|
||||
}
|
||||
|
|
|
@ -92,7 +92,7 @@ public class TestHFileReaderImpl {
|
|||
HFile.Reader reader = HFile.createReader(fs, p, new CacheConfig(conf, bucketcache), true, conf);
|
||||
|
||||
// warm cache
|
||||
HFileScanner scanner = reader.getScanner(true, true);
|
||||
HFileScanner scanner = reader.getScanner(conf, true, true);
|
||||
scanner.seekTo(toKV("i"));
|
||||
assertEquals("i", toRowStr(scanner.getCell()));
|
||||
scanner.close();
|
||||
|
@ -102,7 +102,7 @@ public class TestHFileReaderImpl {
|
|||
}
|
||||
|
||||
// reopen again.
|
||||
scanner = reader.getScanner(true, true);
|
||||
scanner = reader.getScanner(conf, true, true);
|
||||
scanner.seekTo(toKV("i"));
|
||||
assertEquals("i", toRowStr(scanner.getCell()));
|
||||
scanner.seekBefore(toKV("i"));
|
||||
|
@ -117,7 +117,7 @@ public class TestHFileReaderImpl {
|
|||
}
|
||||
|
||||
// case 2
|
||||
scanner = reader.getScanner(true, true);
|
||||
scanner = reader.getScanner(conf, true, true);
|
||||
scanner.seekTo(toKV("i"));
|
||||
assertEquals("i", toRowStr(scanner.getCell()));
|
||||
scanner.seekBefore(toKV("c"));
|
||||
|
|
|
@ -201,7 +201,7 @@ public class TestHFileScannerImplReferenceCount {
|
|||
// We've build a HFile tree with index = 16.
|
||||
Assert.assertEquals(16, reader.getTrailer().getNumDataIndexLevels());
|
||||
|
||||
HFileScannerImpl scanner = (HFileScannerImpl) reader.getScanner(true, true, false);
|
||||
HFileScannerImpl scanner = (HFileScannerImpl) reader.getScanner(conf, true, true, false);
|
||||
HFileBlock block1 = reader.getDataBlockIndexReader()
|
||||
.loadDataBlockWithScanInfo(firstCell, null, true, true, false,
|
||||
DataBlockEncoding.NONE, reader).getHFileBlock();
|
||||
|
@ -285,7 +285,7 @@ public class TestHFileScannerImplReferenceCount {
|
|||
// We've build a HFile tree with index = 16.
|
||||
Assert.assertEquals(16, reader.getTrailer().getNumDataIndexLevels());
|
||||
|
||||
HFileScannerImpl scanner = (HFileScannerImpl) reader.getScanner(true, true, false);
|
||||
HFileScannerImpl scanner = (HFileScannerImpl) reader.getScanner(conf, true, true, false);
|
||||
HFileBlock block1 = reader.getDataBlockIndexReader()
|
||||
.loadDataBlockWithScanInfo(firstCell, null, true, true, false,
|
||||
DataBlockEncoding.NONE, reader).getHFileBlock();
|
||||
|
@ -415,7 +415,7 @@ public class TestHFileScannerImplReferenceCount {
|
|||
// We've build a HFile tree with index = 16.
|
||||
Assert.assertEquals(16, reader.getTrailer().getNumDataIndexLevels());
|
||||
|
||||
HFileScannerImpl scanner = (HFileScannerImpl) reader.getScanner(true, true, false);
|
||||
HFileScannerImpl scanner = (HFileScannerImpl) reader.getScanner(conf, true, true, false);
|
||||
HFileBlock block1 = reader.getDataBlockIndexReader()
|
||||
.loadDataBlockWithScanInfo(firstCell, null, true, true, false,
|
||||
DataBlockEncoding.NONE, reader).getHFileBlock();
|
||||
|
|
|
@ -189,7 +189,7 @@ public class TestHFileSeek extends TestCase {
|
|||
Reader reader = TestHFile.createReaderFromStream(context, new CacheConfig(conf), conf);
|
||||
KeySampler kSampler = new KeySampler(rng, ((KeyValue) reader.getFirstKey().get()).getKey(),
|
||||
((KeyValue) reader.getLastKey().get()).getKey(), keyLenGen);
|
||||
HFileScanner scanner = reader.getScanner(false, USE_PREAD);
|
||||
HFileScanner scanner = reader.getScanner(conf, false, USE_PREAD);
|
||||
BytesWritable key = new BytesWritable();
|
||||
timer.reset();
|
||||
timer.start();
|
||||
|
|
|
@ -190,7 +190,7 @@ public class TestHFileWriterV3 {
|
|||
.withFileSystem(fs)
|
||||
.withFileSize(fileSize).build();
|
||||
HFileBlock.FSReader blockReader =
|
||||
new HFileBlock.FSReaderImpl(readerContext, meta, ByteBuffAllocator.HEAP);
|
||||
new HFileBlock.FSReaderImpl(readerContext, meta, ByteBuffAllocator.HEAP, conf);
|
||||
// Comparator class name is stored in the trailer in version 3.
|
||||
CellComparator comparator = trailer.createComparator();
|
||||
HFileBlockIndex.BlockIndexReader dataBlockIndexReader =
|
||||
|
|
|
@ -176,7 +176,7 @@ public class TestHFileWriterV3WithDataEncoders {
|
|||
.withFileSystem(fs)
|
||||
.withFileSize(fileSize).build();
|
||||
HFileBlock.FSReader blockReader =
|
||||
new HFileBlock.FSReaderImpl(readerContext, meta, ByteBuffAllocator.HEAP);
|
||||
new HFileBlock.FSReaderImpl(readerContext, meta, ByteBuffAllocator.HEAP, conf);
|
||||
// Comparator class name is stored in the trailer in version 3.
|
||||
CellComparator comparator = trailer.createComparator();
|
||||
HFileBlockIndex.BlockIndexReader dataBlockIndexReader =
|
||||
|
@ -277,7 +277,7 @@ public class TestHFileWriterV3WithDataEncoders {
|
|||
origBlock.limit(pos + block.getUncompressedSizeWithoutHeader() - DataBlockEncoding.ID_SIZE);
|
||||
ByteBuff buf = origBlock.slice();
|
||||
DataBlockEncoder.EncodedSeeker seeker =
|
||||
encoder.createSeeker(encoder.newDataBlockDecodingContext(meta));
|
||||
encoder.createSeeker(encoder.newDataBlockDecodingContext(conf, meta));
|
||||
seeker.setCurrentBuffer(buf);
|
||||
Cell res = seeker.getCell();
|
||||
KeyValue kv = keyValues.get(entriesRead);
|
||||
|
|
|
@ -112,7 +112,7 @@ public class TestReseekTo {
|
|||
|
||||
HFile.Reader reader = HFile.createReader(TEST_UTIL.getTestFileSystem(), ncTFile, cacheConf,
|
||||
true, TEST_UTIL.getConfiguration());
|
||||
HFileScanner scanner = reader.getScanner(false, true);
|
||||
HFileScanner scanner = reader.getScanner(TEST_UTIL.getConfiguration(), false, true);
|
||||
|
||||
scanner.seekTo();
|
||||
for (int i = 0; i < keyList.size(); i++) {
|
||||
|
|
|
@ -150,7 +150,7 @@ public class TestSeekBeforeWithInlineBlocks {
|
|||
// Check that we can seekBefore in either direction and with both pread
|
||||
// enabled and disabled
|
||||
for (boolean pread : new boolean[] { false, true }) {
|
||||
HFileScanner scanner = reader.getScanner(true, pread);
|
||||
HFileScanner scanner = reader.getScanner(conf, true, pread);
|
||||
checkNoSeekBefore(cells, scanner, 0);
|
||||
for (int i = 1; i < NUM_KV; i++) {
|
||||
checkSeekBefore(cells, scanner, i);
|
||||
|
|
|
@ -148,7 +148,7 @@ public class TestSeekTo {
|
|||
FileSystem fs = TEST_UTIL.getTestFileSystem();
|
||||
Configuration conf = TEST_UTIL.getConfiguration();
|
||||
HFile.Reader reader = HFile.createReader(fs, p, new CacheConfig(conf), true, conf);
|
||||
HFileScanner scanner = reader.getScanner(false, true);
|
||||
HFileScanner scanner = reader.getScanner(conf, false, true);
|
||||
assertFalse(scanner.seekBefore(toKV("a", tagUsage)));
|
||||
|
||||
assertFalse(scanner.seekBefore(toKV("c", tagUsage)));
|
||||
|
@ -206,7 +206,7 @@ public class TestSeekTo {
|
|||
FileSystem fs = TEST_UTIL.getTestFileSystem();
|
||||
Configuration conf = TEST_UTIL.getConfiguration();
|
||||
HFile.Reader reader = HFile.createReader(fs, p, new CacheConfig(conf), true, conf);
|
||||
HFileScanner scanner = reader.getScanner(false, true);
|
||||
HFileScanner scanner = reader.getScanner(conf, false, true);
|
||||
assertFalse(scanner.seekBefore(toKV("a", tagUsage)));
|
||||
assertFalse(scanner.seekBefore(toKV("b", tagUsage)));
|
||||
assertFalse(scanner.seekBefore(toKV("c", tagUsage)));
|
||||
|
@ -300,7 +300,7 @@ public class TestSeekTo {
|
|||
Configuration conf = TEST_UTIL.getConfiguration();
|
||||
HFile.Reader reader = HFile.createReader(fs, p, new CacheConfig(conf), true, conf);
|
||||
assertEquals(2, reader.getDataBlockIndexReader().getRootBlockCount());
|
||||
HFileScanner scanner = reader.getScanner(false, true);
|
||||
HFileScanner scanner = reader.getScanner(conf, false, true);
|
||||
// lies before the start of the file.
|
||||
assertEquals(-1, scanner.seekTo(toKV("a", tagUsage)));
|
||||
|
||||
|
|
|
@ -119,6 +119,7 @@ public class DataBlockEncodingTool {
|
|||
private static int benchmarkNTimes = DEFAULT_BENCHMARK_N_TIMES;
|
||||
private static int benchmarkNOmit = DEFAULT_BENCHMARK_N_OMIT;
|
||||
|
||||
private final Configuration conf;
|
||||
private List<EncodedDataBlock> codecs = new ArrayList<>();
|
||||
private long totalPrefixLength = 0;
|
||||
private long totalKeyLength = 0;
|
||||
|
@ -157,7 +158,8 @@ public class DataBlockEncodingTool {
|
|||
* @param compressionAlgorithmName What kind of algorithm should be used
|
||||
* as baseline for comparison (e.g. lzo, gz).
|
||||
*/
|
||||
public DataBlockEncodingTool(String compressionAlgorithmName) {
|
||||
public DataBlockEncodingTool(Configuration conf, String compressionAlgorithmName) {
|
||||
this.conf = conf;
|
||||
this.compressionAlgorithmName = compressionAlgorithmName;
|
||||
this.compressionAlgorithm = Compression.getCompressionAlgorithmByName(
|
||||
compressionAlgorithmName);
|
||||
|
@ -242,7 +244,7 @@ public class DataBlockEncodingTool {
|
|||
.withCompression(Compression.Algorithm.NONE)
|
||||
.withIncludesMvcc(includesMemstoreTS)
|
||||
.withIncludesTags(USE_TAG).build();
|
||||
codecs.add(new EncodedDataBlock(d, encoding, rawKVs, meta ));
|
||||
codecs.add(new EncodedDataBlock(conf, d, encoding, rawKVs, meta));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -619,7 +621,7 @@ public class DataBlockEncodingTool {
|
|||
false, hsf.getMaxMemStoreTS(), 0, false);
|
||||
USE_TAG = reader.getHFileReader().getFileContext().isIncludesTags();
|
||||
// run the utilities
|
||||
DataBlockEncodingTool comp = new DataBlockEncodingTool(compressionName);
|
||||
DataBlockEncodingTool comp = new DataBlockEncodingTool(conf, compressionName);
|
||||
int majorVersion = reader.getHFileVersion();
|
||||
comp.useHBaseChecksum = majorVersion > 2 ||
|
||||
(majorVersion == 2 &&
|
||||
|
|
|
@ -231,7 +231,7 @@ public class TestCacheOnWriteInSchema {
|
|||
HFile.Reader reader = sf.getReader().getHFileReader();
|
||||
try {
|
||||
// Open a scanner with (on read) caching disabled
|
||||
HFileScanner scanner = reader.getScanner(false, false);
|
||||
HFileScanner scanner = reader.getScanner(conf, false, false);
|
||||
assertTrue(testDescription, scanner.seekTo());
|
||||
// Cribbed from io.hfile.TestCacheOnWrite
|
||||
long offset = 0;
|
||||
|
|
|
@ -614,7 +614,7 @@ public class TestLoadIncrementalHFiles {
|
|||
Configuration conf = util.getConfiguration();
|
||||
HFile.Reader reader =
|
||||
HFile.createReader(p.getFileSystem(conf), p, new CacheConfig(conf), true, conf);
|
||||
HFileScanner scanner = reader.getScanner(false, false);
|
||||
HFileScanner scanner = reader.getScanner(conf, false, false);
|
||||
scanner.seekTo();
|
||||
int count = 0;
|
||||
do {
|
||||
|
|
Loading…
Reference in New Issue