HBASE-27539 Encapsulate and centralise access to ref count through StoreFileInfo (#4938)

Signed-off-by: Wellington Chevreuil <wchevreuil@apache.org>
This commit is contained in:
chenglei 2022-12-24 19:28:09 +08:00 committed by GitHub
parent cdc48fca41
commit 10309c637e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 126 additions and 90 deletions

View File

@ -20,7 +20,6 @@ package org.apache.hadoop.hbase.io;
import java.io.IOException; import java.io.IOException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.Optional; import java.util.Optional;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
@ -31,6 +30,7 @@ import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.io.hfile.HFileInfo; import org.apache.hadoop.hbase.io.hfile.HFileInfo;
import org.apache.hadoop.hbase.io.hfile.HFileScanner; import org.apache.hadoop.hbase.io.hfile.HFileScanner;
import org.apache.hadoop.hbase.io.hfile.ReaderContext; import org.apache.hadoop.hbase.io.hfile.ReaderContext;
import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
import org.apache.hadoop.hbase.regionserver.StoreFileReader; import org.apache.hadoop.hbase.regionserver.StoreFileReader;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
@ -69,13 +69,12 @@ public class HalfStoreFileReader extends StoreFileReader {
* @param fileInfo HFile info * @param fileInfo HFile info
* @param cacheConf CacheConfig * @param cacheConf CacheConfig
* @param r original reference file (contains top or bottom) * @param r original reference file (contains top or bottom)
* @param refCount reference count
* @param conf Configuration * @param conf Configuration
*/ */
public HalfStoreFileReader(final ReaderContext context, final HFileInfo fileInfo, public HalfStoreFileReader(final ReaderContext context, final HFileInfo fileInfo,
final CacheConfig cacheConf, final Reference r, AtomicInteger refCount, final CacheConfig cacheConf, final Reference r, StoreFileInfo storeFileInfo,
final Configuration conf) throws IOException { final Configuration conf) throws IOException {
super(context, fileInfo, cacheConf, refCount, conf); super(context, fileInfo, cacheConf, storeFileInfo, conf);
// This is not actual midkey for this half-file; its just border // This is not actual midkey for this half-file; its just border
// around which we split top and bottom. Have to look in files to find // around which we split top and bottom. Have to look in files to find
// actual last and first keys for bottom and top halves. Half-files don't // actual last and first keys for bottom and top halves. Half-files don't

View File

@ -349,12 +349,12 @@ public class HStoreFile implements StoreFile {
} }
public int getRefCount() { public int getRefCount() {
return fileInfo.refCount.get(); return fileInfo.getRefCount();
} }
/** Returns true if the file is still used in reads */ /** Returns true if the file is still used in reads */
public boolean isReferencedInReads() { public boolean isReferencedInReads() {
int rc = fileInfo.refCount.get(); int rc = fileInfo.getRefCount();
assert rc >= 0; // we should not go negative. assert rc >= 0; // we should not go negative.
return rc > 0; return rc > 0;
} }
@ -653,11 +653,11 @@ public class HStoreFile implements StoreFile {
} }
long increaseRefCount() { long increaseRefCount() {
return this.fileInfo.refCount.incrementAndGet(); return this.fileInfo.increaseRefCount();
} }
long decreaseRefCount() { long decreaseRefCount() {
return this.fileInfo.refCount.decrementAndGet(); return this.fileInfo.decreaseRefCount();
} }
static void increaseStoreFilesRefeCount(Collection<HStoreFile> storeFiles) { static void increaseStoreFilesRefeCount(Collection<HStoreFile> storeFiles) {

View File

@ -108,7 +108,7 @@ public class StoreFileInfo implements Configurable {
// Counter that is incremented every time a scanner is created on the // Counter that is incremented every time a scanner is created on the
// store file. It is decremented when the scan on the store file is // store file. It is decremented when the scan on the store file is
// done. // done.
final AtomicInteger refCount = new AtomicInteger(0); private final AtomicInteger refCount = new AtomicInteger(0);
/** /**
* Create a Store File Info * Create a Store File Info
@ -275,12 +275,13 @@ public class StoreFileInfo implements Configurable {
return this.hdfsBlocksDistribution; return this.hdfsBlocksDistribution;
} }
StoreFileReader createReader(ReaderContext context, CacheConfig cacheConf) throws IOException { public StoreFileReader createReader(ReaderContext context, CacheConfig cacheConf)
throws IOException {
StoreFileReader reader = null; StoreFileReader reader = null;
if (this.reference != null) { if (this.reference != null) {
reader = new HalfStoreFileReader(context, hfileInfo, cacheConf, reference, refCount, conf); reader = new HalfStoreFileReader(context, hfileInfo, cacheConf, reference, this, conf);
} else { } else {
reader = new StoreFileReader(context, hfileInfo, cacheConf, refCount, conf); reader = new StoreFileReader(context, hfileInfo, cacheConf, this, conf);
} }
return reader; return reader;
} }
@ -668,7 +669,7 @@ public class StoreFileInfo implements Configurable {
return this.noReadahead; return this.noReadahead;
} }
HFileInfo getHFileInfo() { public HFileInfo getHFileInfo() {
return hfileInfo; return hfileInfo;
} }
@ -700,4 +701,16 @@ public class StoreFileInfo implements Configurable {
this.hfileInfo = new HFileInfo(context, conf); this.hfileInfo = new HFileInfo(context, conf);
} }
int getRefCount() {
return this.refCount.get();
}
int increaseRefCount() {
return this.refCount.incrementAndGet();
}
int decreaseRefCount() {
return this.refCount.decrementAndGet();
}
} }

View File

@ -27,7 +27,6 @@ import java.io.IOException;
import java.util.Map; import java.util.Map;
import java.util.Optional; import java.util.Optional;
import java.util.SortedSet; import java.util.SortedSet;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellComparator; import org.apache.hadoop.hbase.CellComparator;
@ -78,24 +77,26 @@ public class StoreFileReader {
private int prefixLength = -1; private int prefixLength = -1;
protected Configuration conf; protected Configuration conf;
// Counter that is incremented every time a scanner is created on the /**
// store file. It is decremented when the scan on the store file is * All {@link StoreFileReader} for the same StoreFile will share the
// done. All StoreFileReader for the same StoreFile will share this counter. * {@link StoreFileInfo#refCount}. Counter that is incremented every time a scanner is created on
private final AtomicInteger refCount; * the store file. It is decremented when the scan on the store file is done.
*/
private final StoreFileInfo storeFileInfo;
private final ReaderContext context; private final ReaderContext context;
private StoreFileReader(HFile.Reader reader, AtomicInteger refCount, ReaderContext context, private StoreFileReader(HFile.Reader reader, StoreFileInfo storeFileInfo, ReaderContext context,
Configuration conf) { Configuration conf) {
this.reader = reader; this.reader = reader;
bloomFilterType = BloomType.NONE; bloomFilterType = BloomType.NONE;
this.refCount = refCount; this.storeFileInfo = storeFileInfo;
this.context = context; this.context = context;
this.conf = conf; this.conf = conf;
} }
public StoreFileReader(ReaderContext context, HFileInfo fileInfo, CacheConfig cacheConf, public StoreFileReader(ReaderContext context, HFileInfo fileInfo, CacheConfig cacheConf,
AtomicInteger refCount, Configuration conf) throws IOException { StoreFileInfo storeFileInfo, Configuration conf) throws IOException {
this(HFile.createReader(context, fileInfo, cacheConf, conf), refCount, context, conf); this(HFile.createReader(context, fileInfo, cacheConf, conf), storeFileInfo, context, conf);
} }
void copyFields(StoreFileReader storeFileReader) throws IOException { void copyFields(StoreFileReader storeFileReader) throws IOException {
@ -120,7 +121,7 @@ public class StoreFileReader {
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
StoreFileReader() { StoreFileReader() {
this.refCount = new AtomicInteger(0); this.storeFileInfo = null;
this.reader = null; this.reader = null;
this.context = null; this.context = null;
} }
@ -151,7 +152,7 @@ public class StoreFileReader {
* is opened. * is opened.
*/ */
int getRefCount() { int getRefCount() {
return refCount.get(); return storeFileInfo.getRefCount();
} }
/** /**
@ -159,7 +160,7 @@ public class StoreFileReader {
* count so reader is not close until some object is holding the lock * count so reader is not close until some object is holding the lock
*/ */
void incrementRefCount() { void incrementRefCount() {
refCount.incrementAndGet(); storeFileInfo.increaseRefCount();
} }
/** /**
@ -167,7 +168,7 @@ public class StoreFileReader {
* count, and also, if this is not the common pread reader, we should close it. * count, and also, if this is not the common pread reader, we should close it.
*/ */
void readCompleted() { void readCompleted() {
refCount.decrementAndGet(); storeFileInfo.decreaseRefCount();
if (context.getReaderType() == ReaderType.STREAM) { if (context.getReaderType() == ReaderType.STREAM) {
try { try {
reader.close(false); reader.close(false);

View File

@ -402,7 +402,7 @@ public class StoreFileWriter implements CellSink, ShipperListener {
* @param dir Directory to create file in. * @param dir Directory to create file in.
* @return random filename inside passed <code>dir</code> * @return random filename inside passed <code>dir</code>
*/ */
static Path getUniqueFile(final FileSystem fs, final Path dir) throws IOException { public static Path getUniqueFile(final FileSystem fs, final Path dir) throws IOException {
if (!fs.getFileStatus(dir).isDirectory()) { if (!fs.getFileStatus(dir).isDirectory()) {
throw new IOException("Expecting " + dir.toString() + " to be a directory"); throw new IOException("Expecting " + dir.toString() + " to be a directory");
} }

View File

@ -1168,10 +1168,11 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
StoreFileWriter halfWriter = null; StoreFileWriter halfWriter = null;
try { try {
ReaderContext context = new ReaderContextBuilder().withFileSystemAndPath(fs, inFile).build(); ReaderContext context = new ReaderContextBuilder().withFileSystemAndPath(fs, inFile).build();
HFileInfo hfile = new HFileInfo(context, conf); StoreFileInfo storeFileInfo =
halfReader = new StoreFileInfo(conf, fs, fs.getFileStatus(inFile), reference);
new HalfStoreFileReader(context, hfile, cacheConf, reference, new AtomicInteger(0), conf); storeFileInfo.initHFileInfo(context);
hfile.initMetaAndIndex(halfReader.getHFileReader()); halfReader = (HalfStoreFileReader) storeFileInfo.createReader(context, cacheConf);
storeFileInfo.getHFileInfo().initMetaAndIndex(halfReader.getHFileReader());
Map<byte[], byte[]> fileInfo = halfReader.loadFileInfo(); Map<byte[], byte[]> fileInfo = halfReader.loadFileInfo();
int blocksize = familyDescriptor.getBlocksize(); int blocksize = familyDescriptor.getBlocksize();

View File

@ -24,7 +24,6 @@ import static org.junit.Assert.assertTrue;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
@ -39,10 +38,10 @@ import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.io.hfile.HFile; import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.io.hfile.HFileContext; import org.apache.hadoop.hbase.io.hfile.HFileContext;
import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
import org.apache.hadoop.hbase.io.hfile.HFileInfo;
import org.apache.hadoop.hbase.io.hfile.HFileScanner; import org.apache.hadoop.hbase.io.hfile.HFileScanner;
import org.apache.hadoop.hbase.io.hfile.ReaderContext; import org.apache.hadoop.hbase.io.hfile.ReaderContext;
import org.apache.hadoop.hbase.io.hfile.ReaderContextBuilder; import org.apache.hadoop.hbase.io.hfile.ReaderContextBuilder;
import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
import org.apache.hadoop.hbase.testclassification.IOTests; import org.apache.hadoop.hbase.testclassification.IOTests;
import org.apache.hadoop.hbase.testclassification.SmallTests; import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
@ -118,10 +117,12 @@ public class TestHalfStoreFileReader {
private void doTestOfScanAndReseek(Path p, FileSystem fs, Reference bottom, CacheConfig cacheConf) private void doTestOfScanAndReseek(Path p, FileSystem fs, Reference bottom, CacheConfig cacheConf)
throws IOException { throws IOException {
ReaderContext context = new ReaderContextBuilder().withFileSystemAndPath(fs, p).build(); ReaderContext context = new ReaderContextBuilder().withFileSystemAndPath(fs, p).build();
HFileInfo fileInfo = new HFileInfo(context, TEST_UTIL.getConfiguration()); StoreFileInfo storeFileInfo =
final HalfStoreFileReader halfreader = new HalfStoreFileReader(context, fileInfo, cacheConf, new StoreFileInfo(TEST_UTIL.getConfiguration(), fs, fs.getFileStatus(p), bottom);
bottom, new AtomicInteger(0), TEST_UTIL.getConfiguration()); storeFileInfo.initHFileInfo(context);
fileInfo.initMetaAndIndex(halfreader.getHFileReader()); final HalfStoreFileReader halfreader =
(HalfStoreFileReader) storeFileInfo.createReader(context, cacheConf);
storeFileInfo.getHFileInfo().initMetaAndIndex(halfreader.getHFileReader());
halfreader.loadFileInfo(); halfreader.loadFileInfo();
final HFileScanner scanner = halfreader.getScanner(false, false); final HFileScanner scanner = halfreader.getScanner(false, false);
@ -214,10 +215,12 @@ public class TestHalfStoreFileReader {
private Cell doTestOfSeekBefore(Path p, FileSystem fs, Reference bottom, Cell seekBefore, private Cell doTestOfSeekBefore(Path p, FileSystem fs, Reference bottom, Cell seekBefore,
CacheConfig cacheConfig) throws IOException { CacheConfig cacheConfig) throws IOException {
ReaderContext context = new ReaderContextBuilder().withFileSystemAndPath(fs, p).build(); ReaderContext context = new ReaderContextBuilder().withFileSystemAndPath(fs, p).build();
HFileInfo fileInfo = new HFileInfo(context, TEST_UTIL.getConfiguration()); StoreFileInfo storeFileInfo =
final HalfStoreFileReader halfreader = new HalfStoreFileReader(context, fileInfo, cacheConfig, new StoreFileInfo(TEST_UTIL.getConfiguration(), fs, fs.getFileStatus(p), bottom);
bottom, new AtomicInteger(0), TEST_UTIL.getConfiguration()); storeFileInfo.initHFileInfo(context);
fileInfo.initMetaAndIndex(halfreader.getHFileReader()); final HalfStoreFileReader halfreader =
(HalfStoreFileReader) storeFileInfo.createReader(context, cacheConfig);
storeFileInfo.getHFileInfo().initMetaAndIndex(halfreader.getHFileReader());
halfreader.loadFileInfo(); halfreader.loadFileInfo();
final HFileScanner scanner = halfreader.getScanner(false, false); final HFileScanner scanner = halfreader.getScanner(false, false);
scanner.seekBefore(seekBefore); scanner.seekBefore(seekBefore);

View File

@ -39,7 +39,6 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.OptionalLong; import java.util.OptionalLong;
import java.util.TreeSet; import java.util.TreeSet;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiFunction; import java.util.function.BiFunction;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
@ -76,7 +75,6 @@ import org.apache.hadoop.hbase.io.hfile.HFileBlock;
import org.apache.hadoop.hbase.io.hfile.HFileContext; import org.apache.hadoop.hbase.io.hfile.HFileContext;
import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoder; import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoder;
import org.apache.hadoop.hbase.io.hfile.HFileInfo;
import org.apache.hadoop.hbase.io.hfile.HFileScanner; import org.apache.hadoop.hbase.io.hfile.HFileScanner;
import org.apache.hadoop.hbase.io.hfile.PreviousBlockCompressionRatePredicator; import org.apache.hadoop.hbase.io.hfile.PreviousBlockCompressionRatePredicator;
import org.apache.hadoop.hbase.io.hfile.ReaderContext; import org.apache.hadoop.hbase.io.hfile.ReaderContext;
@ -119,7 +117,7 @@ public class TestHStoreFile {
private static final Logger LOG = LoggerFactory.getLogger(TestHStoreFile.class); private static final Logger LOG = LoggerFactory.getLogger(TestHStoreFile.class);
private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
private CacheConfig cacheConf = new CacheConfig(TEST_UTIL.getConfiguration()); private CacheConfig cacheConf = new CacheConfig(TEST_UTIL.getConfiguration());
private static String ROOT_DIR = TEST_UTIL.getDataTestDir("TestStoreFile").toString(); private static Path ROOT_DIR = TEST_UTIL.getDataTestDir("TestStoreFile");
private static final ChecksumType CKTYPE = ChecksumType.CRC32C; private static final ChecksumType CKTYPE = ChecksumType.CRC32C;
private static final int CKBYTES = 512; private static final int CKBYTES = 512;
private static String TEST_FAMILY = "cf"; private static String TEST_FAMILY = "cf";
@ -594,10 +592,10 @@ public class TestHStoreFile {
writer.close(); writer.close();
ReaderContext context = new ReaderContextBuilder().withFileSystemAndPath(fs, f).build(); ReaderContext context = new ReaderContextBuilder().withFileSystemAndPath(fs, f).build();
HFileInfo fileInfo = new HFileInfo(context, conf); StoreFileInfo storeFileInfo = new StoreFileInfo(conf, fs, f, true);
StoreFileReader reader = storeFileInfo.initHFileInfo(context);
new StoreFileReader(context, fileInfo, cacheConf, new AtomicInteger(0), conf); StoreFileReader reader = storeFileInfo.createReader(context, cacheConf);
fileInfo.initMetaAndIndex(reader.getHFileReader()); storeFileInfo.getHFileInfo().initMetaAndIndex(reader.getHFileReader());
reader.loadFileInfo(); reader.loadFileInfo();
reader.loadBloomfilter(); reader.loadBloomfilter();
StoreFileScanner scanner = getStoreFileScanner(reader, false, false); StoreFileScanner scanner = getStoreFileScanner(reader, false, false);
@ -642,7 +640,10 @@ public class TestHStoreFile {
conf.setBoolean(BloomFilterFactory.IO_STOREFILE_BLOOM_ENABLED, true); conf.setBoolean(BloomFilterFactory.IO_STOREFILE_BLOOM_ENABLED, true);
// write the file // write the file
Path f = new Path(ROOT_DIR, name.getMethodName()); if (!fs.exists(ROOT_DIR)) {
fs.mkdirs(ROOT_DIR);
}
Path f = StoreFileWriter.getUniqueFile(fs, ROOT_DIR);
HFileContext meta = new HFileContextBuilder().withBlockSize(BLOCKSIZE_SMALL) HFileContext meta = new HFileContextBuilder().withBlockSize(BLOCKSIZE_SMALL)
.withChecksumType(CKTYPE).withBytesPerCheckSum(CKBYTES).build(); .withChecksumType(CKTYPE).withBytesPerCheckSum(CKBYTES).build();
// Make a store file and write data to it. // Make a store file and write data to it.
@ -658,7 +659,10 @@ public class TestHStoreFile {
float err = conf.getFloat(BloomFilterFactory.IO_STOREFILE_BLOOM_ERROR_RATE, 0); float err = conf.getFloat(BloomFilterFactory.IO_STOREFILE_BLOOM_ERROR_RATE, 0);
// write the file // write the file
Path f = new Path(ROOT_DIR, name.getMethodName()); if (!fs.exists(ROOT_DIR)) {
fs.mkdirs(ROOT_DIR);
}
Path f = StoreFileWriter.getUniqueFile(fs, ROOT_DIR);
HFileContext meta = new HFileContextBuilder().withBlockSize(BLOCKSIZE_SMALL) HFileContext meta = new HFileContextBuilder().withBlockSize(BLOCKSIZE_SMALL)
.withChecksumType(CKTYPE).withBytesPerCheckSum(CKBYTES).build(); .withChecksumType(CKTYPE).withBytesPerCheckSum(CKBYTES).build();
@ -677,10 +681,10 @@ public class TestHStoreFile {
writer.close(); writer.close();
ReaderContext context = new ReaderContextBuilder().withFileSystemAndPath(fs, f).build(); ReaderContext context = new ReaderContextBuilder().withFileSystemAndPath(fs, f).build();
HFileInfo fileInfo = new HFileInfo(context, conf); StoreFileInfo storeFileInfo = new StoreFileInfo(conf, fs, f, true);
StoreFileReader reader = storeFileInfo.initHFileInfo(context);
new StoreFileReader(context, fileInfo, cacheConf, new AtomicInteger(0), conf); StoreFileReader reader = storeFileInfo.createReader(context, cacheConf);
fileInfo.initMetaAndIndex(reader.getHFileReader()); storeFileInfo.getHFileInfo().initMetaAndIndex(reader.getHFileReader());
reader.loadFileInfo(); reader.loadFileInfo();
reader.loadBloomfilter(); reader.loadBloomfilter();
@ -716,7 +720,11 @@ public class TestHStoreFile {
@Test @Test
public void testReseek() throws Exception { public void testReseek() throws Exception {
// write the file // write the file
Path f = new Path(ROOT_DIR, name.getMethodName()); if (!fs.exists(ROOT_DIR)) {
fs.mkdirs(ROOT_DIR);
}
Path f = StoreFileWriter.getUniqueFile(fs, ROOT_DIR);
HFileContext meta = new HFileContextBuilder().withBlockSize(8 * 1024).build(); HFileContext meta = new HFileContextBuilder().withBlockSize(8 * 1024).build();
// Make a store file and write data to it. // Make a store file and write data to it.
StoreFileWriter writer = new StoreFileWriter.Builder(conf, cacheConf, this.fs).withFilePath(f) StoreFileWriter writer = new StoreFileWriter.Builder(conf, cacheConf, this.fs).withFilePath(f)
@ -726,10 +734,10 @@ public class TestHStoreFile {
writer.close(); writer.close();
ReaderContext context = new ReaderContextBuilder().withFileSystemAndPath(fs, f).build(); ReaderContext context = new ReaderContextBuilder().withFileSystemAndPath(fs, f).build();
HFileInfo fileInfo = new HFileInfo(context, conf); StoreFileInfo storeFileInfo = new StoreFileInfo(conf, fs, f, true);
StoreFileReader reader = storeFileInfo.initHFileInfo(context);
new StoreFileReader(context, fileInfo, cacheConf, new AtomicInteger(0), conf); StoreFileReader reader = storeFileInfo.createReader(context, cacheConf);
fileInfo.initMetaAndIndex(reader.getHFileReader()); storeFileInfo.getHFileInfo().initMetaAndIndex(reader.getHFileReader());
// Now do reseek with empty KV to position to the beginning of the file // Now do reseek with empty KV to position to the beginning of the file
@ -760,9 +768,13 @@ public class TestHStoreFile {
// 2nd for loop for every column (2*colCount) // 2nd for loop for every column (2*colCount)
float[] expErr = { 2 * rowCount * colCount * err, 2 * rowCount * 2 * colCount * err }; float[] expErr = { 2 * rowCount * colCount * err, 2 * rowCount * 2 * colCount * err };
if (!fs.exists(ROOT_DIR)) {
fs.mkdirs(ROOT_DIR);
}
for (int x : new int[] { 0, 1 }) { for (int x : new int[] { 0, 1 }) {
// write the file // write the file
Path f = new Path(ROOT_DIR, name.getMethodName() + x); Path f = StoreFileWriter.getUniqueFile(fs, ROOT_DIR);
HFileContext meta = new HFileContextBuilder().withBlockSize(BLOCKSIZE_SMALL) HFileContext meta = new HFileContextBuilder().withBlockSize(BLOCKSIZE_SMALL)
.withChecksumType(CKTYPE).withBytesPerCheckSum(CKBYTES).build(); .withChecksumType(CKTYPE).withBytesPerCheckSum(CKBYTES).build();
// Make a store file and write data to it. // Make a store file and write data to it.
@ -786,10 +798,10 @@ public class TestHStoreFile {
ReaderContext context = ReaderContext context =
new ReaderContextBuilder().withFilePath(f).withFileSize(fs.getFileStatus(f).getLen()) new ReaderContextBuilder().withFilePath(f).withFileSize(fs.getFileStatus(f).getLen())
.withFileSystem(fs).withInputStreamWrapper(new FSDataInputStreamWrapper(fs, f)).build(); .withFileSystem(fs).withInputStreamWrapper(new FSDataInputStreamWrapper(fs, f)).build();
HFileInfo fileInfo = new HFileInfo(context, conf); StoreFileInfo storeFileInfo = new StoreFileInfo(conf, fs, f, true);
StoreFileReader reader = storeFileInfo.initHFileInfo(context);
new StoreFileReader(context, fileInfo, cacheConf, new AtomicInteger(0), conf); StoreFileReader reader = storeFileInfo.createReader(context, cacheConf);
fileInfo.initMetaAndIndex(reader.getHFileReader()); storeFileInfo.getHFileInfo().initMetaAndIndex(reader.getHFileReader());
reader.loadFileInfo(); reader.loadFileInfo();
reader.loadBloomfilter(); reader.loadBloomfilter();
StoreFileScanner scanner = getStoreFileScanner(reader, false, false); StoreFileScanner scanner = getStoreFileScanner(reader, false, false);

View File

@ -24,7 +24,6 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
import java.io.IOException; import java.io.IOException;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
@ -37,7 +36,6 @@ import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.hfile.CacheConfig; import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.io.hfile.HFileContext; import org.apache.hadoop.hbase.io.hfile.HFileContext;
import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
import org.apache.hadoop.hbase.io.hfile.HFileInfo;
import org.apache.hadoop.hbase.io.hfile.ReaderContext; import org.apache.hadoop.hbase.io.hfile.ReaderContext;
import org.apache.hadoop.hbase.io.hfile.ReaderContextBuilder; import org.apache.hadoop.hbase.io.hfile.ReaderContextBuilder;
import org.apache.hadoop.hbase.log.HBaseMarkers; import org.apache.hadoop.hbase.log.HBaseMarkers;
@ -180,15 +178,18 @@ public class TestRowPrefixBloomFilter {
float expErr = 2 * prefixRowCount * suffixRowCount * err; float expErr = 2 * prefixRowCount * suffixRowCount * err;
int expKeys = fixedLengthExpKeys; int expKeys = fixedLengthExpKeys;
// write the file // write the file
Path f = new Path(testDir, name.getMethodName()); if (!fs.exists(testDir)) {
fs.mkdirs(testDir);
}
Path f = StoreFileWriter.getUniqueFile(fs, testDir);
writeStoreFile(f, bt, expKeys); writeStoreFile(f, bt, expKeys);
// read the file // read the file
ReaderContext context = new ReaderContextBuilder().withFileSystemAndPath(fs, f).build(); ReaderContext context = new ReaderContextBuilder().withFileSystemAndPath(fs, f).build();
HFileInfo fileInfo = new HFileInfo(context, conf); StoreFileInfo storeFileInfo = new StoreFileInfo(conf, fs, f, true);
StoreFileReader reader = storeFileInfo.initHFileInfo(context);
new StoreFileReader(context, fileInfo, cacheConf, new AtomicInteger(0), conf); StoreFileReader reader = storeFileInfo.createReader(context, cacheConf);
fileInfo.initMetaAndIndex(reader.getHFileReader()); storeFileInfo.getHFileInfo().initMetaAndIndex(reader.getHFileReader());
reader.loadFileInfo(); reader.loadFileInfo();
reader.loadBloomfilter(); reader.loadBloomfilter();
@ -251,14 +252,17 @@ public class TestRowPrefixBloomFilter {
FileSystem fs = FileSystem.getLocal(conf); FileSystem fs = FileSystem.getLocal(conf);
int expKeys = fixedLengthExpKeys; int expKeys = fixedLengthExpKeys;
// write the file // write the file
Path f = new Path(testDir, name.getMethodName()); if (!fs.exists(testDir)) {
fs.mkdirs(testDir);
}
Path f = StoreFileWriter.getUniqueFile(fs, testDir);
writeStoreFile(f, bt, expKeys); writeStoreFile(f, bt, expKeys);
ReaderContext context = new ReaderContextBuilder().withFileSystemAndPath(fs, f).build(); ReaderContext context = new ReaderContextBuilder().withFileSystemAndPath(fs, f).build();
HFileInfo fileInfo = new HFileInfo(context, conf); StoreFileInfo storeFileInfo = new StoreFileInfo(conf, fs, f, true);
StoreFileReader reader = storeFileInfo.initHFileInfo(context);
new StoreFileReader(context, fileInfo, cacheConf, new AtomicInteger(0), conf); StoreFileReader reader = storeFileInfo.createReader(context, cacheConf);
fileInfo.initMetaAndIndex(reader.getHFileReader()); storeFileInfo.getHFileInfo().initMetaAndIndex(reader.getHFileReader());
reader.loadFileInfo(); reader.loadFileInfo();
reader.loadBloomfilter(); reader.loadBloomfilter();
@ -304,14 +308,17 @@ public class TestRowPrefixBloomFilter {
FileSystem fs = FileSystem.getLocal(conf); FileSystem fs = FileSystem.getLocal(conf);
int expKeys = fixedLengthExpKeys; int expKeys = fixedLengthExpKeys;
// write the file // write the file
Path f = new Path(testDir, name.getMethodName()); if (!fs.exists(testDir)) {
fs.mkdirs(testDir);
}
Path f = StoreFileWriter.getUniqueFile(fs, testDir);
writeStoreFile(f, bt, expKeys); writeStoreFile(f, bt, expKeys);
ReaderContext context = new ReaderContextBuilder().withFileSystemAndPath(fs, f).build(); ReaderContext context = new ReaderContextBuilder().withFileSystemAndPath(fs, f).build();
HFileInfo fileInfo = new HFileInfo(context, conf); StoreFileInfo storeFileInfo = new StoreFileInfo(conf, fs, f, true);
StoreFileReader reader = storeFileInfo.initHFileInfo(context);
new StoreFileReader(context, fileInfo, cacheConf, new AtomicInteger(0), conf); StoreFileReader reader = storeFileInfo.createReader(context, cacheConf);
fileInfo.initMetaAndIndex(reader.getHFileReader()); storeFileInfo.getHFileInfo().initMetaAndIndex(reader.getHFileReader());
reader.loadFileInfo(); reader.loadFileInfo();
reader.loadBloomfilter(); reader.loadBloomfilter();

View File

@ -22,7 +22,6 @@ import static org.junit.Assert.assertTrue;
import java.io.IOException; import java.io.IOException;
import java.util.List; import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
@ -38,7 +37,6 @@ import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
import org.apache.hadoop.hbase.io.hfile.CacheConfig; import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.io.hfile.HFileContext; import org.apache.hadoop.hbase.io.hfile.HFileContext;
import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
import org.apache.hadoop.hbase.io.hfile.HFileInfo;
import org.apache.hadoop.hbase.io.hfile.ReaderContext; import org.apache.hadoop.hbase.io.hfile.ReaderContext;
import org.apache.hadoop.hbase.io.hfile.ReaderContextBuilder; import org.apache.hadoop.hbase.io.hfile.ReaderContextBuilder;
import org.apache.hadoop.hbase.testclassification.RegionServerTests; import org.apache.hadoop.hbase.testclassification.RegionServerTests;
@ -60,8 +58,7 @@ public class TestStoreFileScannerWithTagCompression {
private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
private static Configuration conf = TEST_UTIL.getConfiguration(); private static Configuration conf = TEST_UTIL.getConfiguration();
private static CacheConfig cacheConf = new CacheConfig(TEST_UTIL.getConfiguration()); private static CacheConfig cacheConf = new CacheConfig(TEST_UTIL.getConfiguration());
private static String ROOT_DIR = private static Path ROOT_DIR = TEST_UTIL.getDataTestDir("TestStoreFileScannerWithTagCompression");
TEST_UTIL.getDataTestDir("TestStoreFileScannerWithTagCompression").toString();
private static FileSystem fs = null; private static FileSystem fs = null;
@BeforeClass @BeforeClass
@ -73,7 +70,10 @@ public class TestStoreFileScannerWithTagCompression {
@Test @Test
public void testReseek() throws Exception { public void testReseek() throws Exception {
// write the file // write the file
Path f = new Path(ROOT_DIR, "testReseek"); if (!fs.exists(ROOT_DIR)) {
fs.mkdirs(ROOT_DIR);
}
Path f = StoreFileWriter.getUniqueFile(fs, ROOT_DIR);
HFileContext meta = new HFileContextBuilder().withBlockSize(8 * 1024).withIncludesTags(true) HFileContext meta = new HFileContextBuilder().withBlockSize(8 * 1024).withIncludesTags(true)
.withCompressTags(true).withDataBlockEncoding(DataBlockEncoding.PREFIX).build(); .withCompressTags(true).withDataBlockEncoding(DataBlockEncoding.PREFIX).build();
// Make a store file and write data to it. // Make a store file and write data to it.
@ -84,10 +84,10 @@ public class TestStoreFileScannerWithTagCompression {
writer.close(); writer.close();
ReaderContext context = new ReaderContextBuilder().withFileSystemAndPath(fs, f).build(); ReaderContext context = new ReaderContextBuilder().withFileSystemAndPath(fs, f).build();
HFileInfo fileInfo = new HFileInfo(context, conf); StoreFileInfo storeFileInfo = new StoreFileInfo(conf, fs, f, true);
StoreFileReader reader = storeFileInfo.initHFileInfo(context);
new StoreFileReader(context, fileInfo, cacheConf, new AtomicInteger(0), conf); StoreFileReader reader = storeFileInfo.createReader(context, cacheConf);
fileInfo.initMetaAndIndex(reader.getHFileReader()); storeFileInfo.getHFileInfo().initMetaAndIndex(reader.getHFileReader());
StoreFileScanner s = reader.getStoreFileScanner(false, false, false, 0, 0, false); StoreFileScanner s = reader.getStoreFileScanner(false, false, false, 0, 0, false);
try { try {
// Now do reseek with empty KV to position to the beginning of the file // Now do reseek with empty KV to position to the beginning of the file