HBASE-15296 Break out writer and reader from StoreFile. Done using Intellij15 Refactor > Move. (Apekshit)

Change-Id: Ie719569cc3393e0b5361e9d462c3cf125ad5144e

Signed-off-by: stack <stack@apache.org>
This commit is contained in:
Apekshit 2016-04-12 13:30:00 -07:00 committed by stack
parent dbdfd8e8d1
commit 7efb9edecb
61 changed files with 1412 additions and 1332 deletions

View File

@ -55,7 +55,7 @@ import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.hadoop.hbase.regionserver.ScanType;
import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.regionserver.StoreFile.Reader;
import org.apache.hadoop.hbase.regionserver.StoreFileReader;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
@ -485,16 +485,16 @@ public class BaseRegionObserver implements RegionObserver {
}
@Override
public Reader preStoreFileReaderOpen(ObserverContext<RegionCoprocessorEnvironment> ctx,
public StoreFileReader preStoreFileReaderOpen(ObserverContext<RegionCoprocessorEnvironment> ctx,
FileSystem fs, Path p, FSDataInputStreamWrapper in, long size, CacheConfig cacheConf,
Reference r, Reader reader) throws IOException {
Reference r, StoreFileReader reader) throws IOException {
return reader;
}
@Override
public Reader postStoreFileReaderOpen(ObserverContext<RegionCoprocessorEnvironment> ctx,
public StoreFileReader postStoreFileReaderOpen(ObserverContext<RegionCoprocessorEnvironment> ctx,
FileSystem fs, Path p, FSDataInputStreamWrapper in, long size, CacheConfig cacheConf,
Reference r, Reader reader) throws IOException {
Reference r, StoreFileReader reader) throws IOException {
return reader;
}

View File

@ -55,6 +55,7 @@ import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.hadoop.hbase.regionserver.ScanType;
import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.regionserver.StoreFileReader;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
import org.apache.hadoop.hbase.wal.WALKey;
@ -1210,9 +1211,9 @@ public interface RegionObserver extends Coprocessor {
* default behavior, null otherwise
* @throws IOException
*/
StoreFile.Reader preStoreFileReaderOpen(final ObserverContext<RegionCoprocessorEnvironment> ctx,
StoreFileReader preStoreFileReaderOpen(final ObserverContext<RegionCoprocessorEnvironment> ctx,
final FileSystem fs, final Path p, final FSDataInputStreamWrapper in, long size,
final CacheConfig cacheConf, final Reference r, StoreFile.Reader reader) throws IOException;
final CacheConfig cacheConf, final Reference r, StoreFileReader reader) throws IOException;
/**
* Called after the creation of Reader for a store file.
@ -1228,9 +1229,9 @@ public interface RegionObserver extends Coprocessor {
* @return The reader to use
* @throws IOException
*/
StoreFile.Reader postStoreFileReaderOpen(final ObserverContext<RegionCoprocessorEnvironment> ctx,
StoreFileReader postStoreFileReaderOpen(final ObserverContext<RegionCoprocessorEnvironment> ctx,
final FileSystem fs, final Path p, final FSDataInputStreamWrapper in, long size,
final CacheConfig cacheConf, final Reference r, StoreFile.Reader reader) throws IOException;
final CacheConfig cacheConf, final Reference r, StoreFileReader reader) throws IOException;
/**
* Called after a new cell has been created during an increment operation, but before

View File

@ -33,7 +33,7 @@ import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.io.hfile.HFileScanner;
import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.regionserver.StoreFileReader;
import org.apache.hadoop.hbase.util.Bytes;
/**
@ -50,7 +50,7 @@ import org.apache.hadoop.hbase.util.Bytes;
* <p>This file is not splitable. Calls to {@link #midkey()} return null.
*/
@InterfaceAudience.Private
public class HalfStoreFileReader extends StoreFile.Reader {
public class HalfStoreFileReader extends StoreFileReader {
private static final Log LOG = LogFactory.getLog(HalfStoreFileReader.class);
final boolean top;
// This is the key we split around. Its the first possible entry on a row:

View File

@ -29,6 +29,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
import org.apache.hadoop.hbase.util.BloomFilterChunk;
import org.apache.hadoop.hbase.util.BloomFilterUtil;
import org.apache.hadoop.hbase.util.BloomFilterWriter;
@ -155,7 +156,7 @@ public class CompoundBloomFilterWriter extends CompoundBloomFilterBase
* Adds a Bloom filter key. This key must be greater than the previous key,
* as defined by the comparator this compound Bloom filter is configured
* with. For efficiency, key monotonicity is not checked here. See
* {@link org.apache.hadoop.hbase.regionserver.StoreFile.Writer#append(
* {@link StoreFileWriter#append(
* org.apache.hadoop.hbase.Cell)} for the details of deduplication.
*/
@Override

View File

@ -65,6 +65,7 @@ import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
import org.apache.hadoop.hbase.regionserver.BloomType;
import org.apache.hadoop.hbase.regionserver.HStore;
import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.SequenceFile;
@ -305,12 +306,12 @@ public class HFileOutputFormat2
if (null == favoredNodes) {
wl.writer =
new StoreFile.WriterBuilder(conf, new CacheConfig(tempConf), fs)
new StoreFileWriter.Builder(conf, new CacheConfig(tempConf), fs)
.withOutputDir(familydir).withBloomType(bloomType)
.withComparator(CellComparator.COMPARATOR).withFileContext(hFileContext).build();
} else {
wl.writer =
new StoreFile.WriterBuilder(conf, new CacheConfig(tempConf), new HFileSystem(fs))
new StoreFileWriter.Builder(conf, new CacheConfig(tempConf), new HFileSystem(fs))
.withOutputDir(familydir).withBloomType(bloomType)
.withComparator(CellComparator.COMPARATOR).withFileContext(hFileContext)
.withFavoredNodes(favoredNodes).build();
@ -320,7 +321,7 @@ public class HFileOutputFormat2
return wl;
}
private void close(final StoreFile.Writer w) throws IOException {
private void close(final StoreFileWriter w) throws IOException {
if (w != null) {
w.appendFileInfo(StoreFile.BULKLOAD_TIME_KEY,
Bytes.toBytes(System.currentTimeMillis()));
@ -350,7 +351,7 @@ public class HFileOutputFormat2
*/
static class WriterLength {
long written = 0;
StoreFile.Writer writer = null;
StoreFileWriter writer = null;
}
/**

View File

@ -65,8 +65,8 @@ import org.apache.hadoop.hbase.io.hfile.HFileScanner;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.regionserver.BloomType;
import org.apache.hadoop.hbase.regionserver.HStore;
import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
import org.apache.hadoop.hbase.security.UserProvider;
import org.apache.hadoop.hbase.security.access.SecureBulkLoadEndpoint;
import org.apache.hadoop.hbase.security.token.FsDelegationToken;
@ -890,7 +890,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
FileSystem fs = inFile.getFileSystem(conf);
CacheConfig cacheConf = new CacheConfig(conf);
HalfStoreFileReader halfReader = null;
StoreFile.Writer halfWriter = null;
StoreFileWriter halfWriter = null;
try {
halfReader = new HalfStoreFileReader(fs, inFile, cacheConf, reference, conf);
Map<byte[], byte[]> fileInfo = halfReader.loadFileInfo();
@ -906,7 +906,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
.withDataBlockEncoding(familyDescriptor.getDataBlockEncoding())
.withIncludesTags(true)
.build();
halfWriter = new StoreFile.WriterBuilder(conf, cacheConf,
halfWriter = new StoreFileWriter.Builder(conf, cacheConf,
fs)
.withFilePath(outFile)
.withBloomType(bloomFilterType)

View File

@ -42,8 +42,8 @@ import org.apache.hadoop.hbase.regionserver.MobCompactionStoreScanner;
import org.apache.hadoop.hbase.regionserver.ScanType;
import org.apache.hadoop.hbase.regionserver.ScannerContext;
import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.regionserver.StoreFile.Writer;
import org.apache.hadoop.hbase.regionserver.StoreFileScanner;
import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor;
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
@ -86,17 +86,17 @@ public class DefaultMobStoreCompactor extends DefaultCompactor {
}
};
private final CellSinkFactory<Writer> writerFactory = new CellSinkFactory<Writer>() {
@Override
public Writer createWriter(InternalScanner scanner,
org.apache.hadoop.hbase.regionserver.compactions.Compactor.FileDetails fd,
boolean shouldDropBehind) throws IOException {
// make this writer with tags always because of possible new cells with tags.
return store.createWriterInTmp(fd.maxKeyCount, compactionCompression, true, true, true,
shouldDropBehind);
}
};
private final CellSinkFactory<StoreFileWriter> writerFactory =
new CellSinkFactory<StoreFileWriter>() {
@Override
public StoreFileWriter createWriter(InternalScanner scanner,
org.apache.hadoop.hbase.regionserver.compactions.Compactor.FileDetails fd,
boolean shouldDropBehind) throws IOException {
// make this writer with tags always because of possible new cells with tags.
return store.createWriterInTmp(fd.maxKeyCount, compactionCompression, true, true, true,
shouldDropBehind);
}
};
public DefaultMobStoreCompactor(Configuration conf, Store store) {
super(conf, store);
@ -180,7 +180,7 @@ public class DefaultMobStoreCompactor extends DefaultCompactor {
boolean hasMore;
Path path = MobUtils.getMobFamilyPath(conf, store.getTableName(), store.getColumnFamilyName());
byte[] fileName = null;
Writer mobFileWriter = null, delFileWriter = null;
StoreFileWriter mobFileWriter = null, delFileWriter = null;
long mobCells = 0, deleteMarkersCount = 0;
Tag tableNameTag = new ArrayBackedTag(TagType.MOB_TABLE_NAME_TAG_TYPE,
store.getTableName().getName());

View File

@ -41,7 +41,7 @@ import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.regionserver.MemStoreSnapshot;
import org.apache.hadoop.hbase.regionserver.ScannerContext;
import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.util.StringUtils;
@ -108,7 +108,7 @@ public class DefaultMobStoreFlusher extends DefaultStoreFlusher {
if (scanner == null) {
return result; // NULL scanner returned from coprocessor hooks means skip normal processing
}
StoreFile.Writer writer;
StoreFileWriter writer;
try {
// TODO: We can fail in the below block before we complete adding this flush to
// list of store files. Add cleanup of anything put on filesystem if we fail.
@ -155,8 +155,8 @@ public class DefaultMobStoreFlusher extends DefaultStoreFlusher {
* @throws IOException
*/
protected void performMobFlush(MemStoreSnapshot snapshot, long cacheFlushId,
InternalScanner scanner, StoreFile.Writer writer, MonitoredTask status) throws IOException {
StoreFile.Writer mobFileWriter = null;
InternalScanner scanner, StoreFileWriter writer, MonitoredTask status) throws IOException {
StoreFileWriter mobFileWriter = null;
int compactionKVMax = conf.getInt(HConstants.COMPACTION_KV_MAX,
HConstants.COMPACTION_KV_MAX_DEFAULT);
long mobCount = 0;

View File

@ -68,6 +68,7 @@ import org.apache.hadoop.hbase.mob.compactions.PartitionedMobCompactor;
import org.apache.hadoop.hbase.regionserver.BloomType;
import org.apache.hadoop.hbase.regionserver.HStore;
import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.FSUtils;
@ -462,7 +463,7 @@ public final class MobUtils {
* @return The writer for the mob file.
* @throws IOException
*/
public static StoreFile.Writer createWriter(Configuration conf, FileSystem fs,
public static StoreFileWriter createWriter(Configuration conf, FileSystem fs,
HColumnDescriptor family, String date, Path basePath, long maxKeyCount,
Compression.Algorithm compression, String startKey, CacheConfig cacheConfig,
Encryption.Context cryptoContext)
@ -485,7 +486,7 @@ public final class MobUtils {
* @return The writer for the mob file.
* @throws IOException
*/
public static StoreFile.Writer createRefFileWriter(Configuration conf, FileSystem fs,
public static StoreFileWriter createRefFileWriter(Configuration conf, FileSystem fs,
HColumnDescriptor family, Path basePath, long maxKeyCount, CacheConfig cacheConfig,
Encryption.Context cryptoContext)
throws IOException {
@ -497,7 +498,7 @@ public final class MobUtils {
.withEncryptionContext(cryptoContext).withCreateTime(EnvironmentEdgeManager.currentTime())
.build();
Path tempPath = new Path(basePath, UUID.randomUUID().toString().replaceAll("-", ""));
StoreFile.Writer w = new StoreFile.WriterBuilder(conf, cacheConfig, fs).withFilePath(tempPath)
StoreFileWriter w = new StoreFileWriter.Builder(conf, cacheConfig, fs).withFilePath(tempPath)
.withComparator(CellComparator.COMPARATOR).withBloomType(family.getBloomFilterType())
.withMaxKeyCount(maxKeyCount).withFileContext(hFileContext).build();
return w;
@ -518,7 +519,7 @@ public final class MobUtils {
* @return The writer for the mob file.
* @throws IOException
*/
public static StoreFile.Writer createWriter(Configuration conf, FileSystem fs,
public static StoreFileWriter createWriter(Configuration conf, FileSystem fs,
HColumnDescriptor family, String date, Path basePath, long maxKeyCount,
Compression.Algorithm compression, byte[] startKey, CacheConfig cacheConfig,
Encryption.Context cryptoContext)
@ -544,7 +545,7 @@ public final class MobUtils {
* @return The writer for the del file.
* @throws IOException
*/
public static StoreFile.Writer createDelFileWriter(Configuration conf, FileSystem fs,
public static StoreFileWriter createDelFileWriter(Configuration conf, FileSystem fs,
HColumnDescriptor family, String date, Path basePath, long maxKeyCount,
Compression.Algorithm compression, byte[] startKey, CacheConfig cacheConfig,
Encryption.Context cryptoContext)
@ -570,7 +571,7 @@ public final class MobUtils {
* @return The writer for the mob file.
* @throws IOException
*/
private static StoreFile.Writer createWriter(Configuration conf, FileSystem fs,
private static StoreFileWriter createWriter(Configuration conf, FileSystem fs,
HColumnDescriptor family, MobFileName mobFileName, Path basePath, long maxKeyCount,
Compression.Algorithm compression, CacheConfig cacheConfig, Encryption.Context cryptoContext)
throws IOException {
@ -583,7 +584,7 @@ public final class MobUtils {
.withEncryptionContext(cryptoContext)
.withCreateTime(EnvironmentEdgeManager.currentTime()).build();
StoreFile.Writer w = new StoreFile.WriterBuilder(conf, cacheConfig, fs)
StoreFileWriter w = new StoreFileWriter.Builder(conf, cacheConfig, fs)
.withFilePath(new Path(basePath, mobFileName.getFileName()))
.withComparator(CellComparator.COMPARATOR).withBloomType(BloomType.NONE)
.withMaxKeyCount(maxKeyCount).withFileContext(hFileContext).build();

View File

@ -69,9 +69,9 @@ import org.apache.hadoop.hbase.regionserver.ScanInfo;
import org.apache.hadoop.hbase.regionserver.ScanType;
import org.apache.hadoop.hbase.regionserver.ScannerContext;
import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.regionserver.StoreFile.Writer;
import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
import org.apache.hadoop.hbase.regionserver.StoreFileScanner;
import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
import org.apache.hadoop.hbase.regionserver.StoreScanner;
import org.apache.hadoop.hbase.security.EncryptionUtil;
import org.apache.hadoop.hbase.util.Bytes;
@ -385,8 +385,8 @@ public class PartitionedMobCompactor extends MobCompactor {
// Pair(maxSeqId, cellsCount)
Pair<Long, Long> fileInfo = getFileInfo(mobFilesToCompact);
// open writers for the mob files and new ref store files.
Writer writer = null;
Writer refFileWriter = null;
StoreFileWriter writer = null;
StoreFileWriter refFileWriter = null;
Path filePath = null;
Path refFilePath = null;
long mobCells = 0;
@ -499,7 +499,7 @@ public class PartitionedMobCompactor extends MobCompactor {
List<StoreFile> delFiles) throws IOException {
// create a scanner for the del files.
StoreScanner scanner = createScanner(delFiles, ScanType.COMPACT_RETAIN_DELETES);
Writer writer = null;
StoreFileWriter writer = null;
Path filePath = null;
try {
writer = MobUtils.createDelFileWriter(conf, fs, column,
@ -589,7 +589,7 @@ public class PartitionedMobCompactor extends MobCompactor {
* @param mobCellsCount The number of mob cells.
* @throws IOException
*/
private void closeMobFileWriter(Writer writer, long maxSeqId, long mobCellsCount)
private void closeMobFileWriter(StoreFileWriter writer, long maxSeqId, long mobCellsCount)
throws IOException {
if (writer != null) {
writer.appendMetadata(maxSeqId, false, mobCellsCount);
@ -608,7 +608,7 @@ public class PartitionedMobCompactor extends MobCompactor {
* @param bulkloadTime The timestamp at which the bulk load file is created.
* @throws IOException
*/
private void closeRefFileWriter(Writer writer, long maxSeqId, long bulkloadTime)
private void closeRefFileWriter(StoreFileWriter writer, long maxSeqId, long bulkloadTime)
throws IOException {
if (writer != null) {
writer.appendMetadata(maxSeqId, false);

View File

@ -45,7 +45,7 @@ import org.apache.hadoop.hbase.mob.mapreduce.SweepJob.SweepCounter;
import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
import org.apache.hadoop.hbase.regionserver.MemStore;
import org.apache.hadoop.hbase.regionserver.MemStoreSnapshot;
import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
import org.apache.hadoop.hbase.security.EncryptionUtil;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.mapreduce.Reducer.Context;
@ -132,7 +132,7 @@ public class MemStoreWrapper {
}
// generate the files into a temp directory.
String tempPathString = context.getConfiguration().get(SweepJob.WORKING_FILES_DIR_KEY);
StoreFile.Writer mobFileWriter = MobUtils.createWriter(conf, fs, hcd, partitionId.getDate(),
StoreFileWriter mobFileWriter = MobUtils.createWriter(conf, fs, hcd, partitionId.getDate(),
new Path(tempPathString), snapshot.getCellsCount(), hcd.getCompactionCompression(),
partitionId.getStartKey(), cacheConfig, cryptoContext);

View File

@ -26,7 +26,6 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.regionserver.StoreFile.Writer;
import org.apache.hadoop.hbase.regionserver.compactions.Compactor.CellSink;
/**
@ -44,7 +43,7 @@ public abstract class AbstractMultiFileWriter implements CellSink {
protected StoreScanner sourceScanner;
public interface WriterFactory {
public StoreFile.Writer createWriter() throws IOException;
public StoreFileWriter createWriter() throws IOException;
}
/**
@ -66,13 +65,13 @@ public abstract class AbstractMultiFileWriter implements CellSink {
*/
public List<Path> commitWriters(long maxSeqId, boolean majorCompaction) throws IOException {
preCommitWriters();
Collection<StoreFile.Writer> writers = this.writers();
Collection<StoreFileWriter> writers = this.writers();
if (LOG.isDebugEnabled()) {
LOG.debug("Commit " + writers.size() + " writers, maxSeqId=" + maxSeqId
+ ", majorCompaction=" + majorCompaction);
}
List<Path> paths = new ArrayList<Path>();
for (Writer writer : writers) {
for (StoreFileWriter writer : writers) {
if (writer == null) {
continue;
}
@ -89,7 +88,7 @@ public abstract class AbstractMultiFileWriter implements CellSink {
*/
public List<Path> abortWriters() {
List<Path> paths = new ArrayList<Path>();
for (StoreFile.Writer writer : writers()) {
for (StoreFileWriter writer : writers()) {
try {
if (writer != null) {
paths.add(writer.getPath());
@ -102,7 +101,7 @@ public abstract class AbstractMultiFileWriter implements CellSink {
return paths;
}
protected abstract Collection<StoreFile.Writer> writers();
protected abstract Collection<StoreFileWriter> writers();
/**
* Subclasses override this method to be called at the end of a successful sequence of append; all
@ -115,6 +114,6 @@ public abstract class AbstractMultiFileWriter implements CellSink {
* Subclasses override this method to be called before we close the give writer. Usually you can
* append extra metadata to the writer.
*/
protected void preCloseWriter(StoreFile.Writer writer) throws IOException {
protected void preCloseWriter(StoreFileWriter writer) throws IOException {
}
}

View File

@ -26,7 +26,6 @@ import java.util.TreeMap;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.regionserver.StoreFile.Writer;
/**
* class for cell sink that separates the provided cells into multiple files for date tiered
@ -35,8 +34,8 @@ import org.apache.hadoop.hbase.regionserver.StoreFile.Writer;
@InterfaceAudience.Private
public class DateTieredMultiFileWriter extends AbstractMultiFileWriter {
private final NavigableMap<Long, StoreFile.Writer> lowerBoundary2Writer
= new TreeMap<Long, StoreFile.Writer>();
private final NavigableMap<Long, StoreFileWriter> lowerBoundary2Writer
= new TreeMap<Long, StoreFileWriter>();
private final boolean needEmptyFile;
@ -53,8 +52,8 @@ public class DateTieredMultiFileWriter extends AbstractMultiFileWriter {
@Override
public void append(Cell cell) throws IOException {
Map.Entry<Long, StoreFile.Writer> entry = lowerBoundary2Writer.floorEntry(cell.getTimestamp());
StoreFile.Writer writer = entry.getValue();
Map.Entry<Long, StoreFileWriter> entry = lowerBoundary2Writer.floorEntry(cell.getTimestamp());
StoreFileWriter writer = entry.getValue();
if (writer == null) {
writer = writerFactory.createWriter();
lowerBoundary2Writer.put(entry.getKey(), writer);
@ -63,7 +62,7 @@ public class DateTieredMultiFileWriter extends AbstractMultiFileWriter {
}
@Override
protected Collection<Writer> writers() {
protected Collection<StoreFileWriter> writers() {
return lowerBoundary2Writer.values();
}
@ -72,7 +71,7 @@ public class DateTieredMultiFileWriter extends AbstractMultiFileWriter {
if (!needEmptyFile) {
return;
}
for (StoreFile.Writer writer : lowerBoundary2Writer.values()) {
for (StoreFileWriter writer : lowerBoundary2Writer.values()) {
if (writer != null) {
return;
}

View File

@ -57,7 +57,7 @@ public class DefaultStoreFlusher extends StoreFlusher {
return result; // NULL scanner returned from coprocessor hooks means skip normal processing
}
StoreFile.Writer writer;
StoreFileWriter writer;
try {
// TODO: We can fail in the below block before we complete adding this flush to
// list of store files. Add cleanup of anything put on filesystem if we fail.

View File

@ -188,7 +188,7 @@ public class HMobStore extends HStore {
* @return The writer for the mob file.
* @throws IOException
*/
public StoreFile.Writer createWriterInTmp(Date date, long maxKeyCount,
public StoreFileWriter createWriterInTmp(Date date, long maxKeyCount,
Compression.Algorithm compression, byte[] startKey) throws IOException {
if (startKey == null) {
startKey = HConstants.EMPTY_START_ROW;
@ -208,7 +208,7 @@ public class HMobStore extends HStore {
* @return The writer for the del file.
* @throws IOException
*/
public StoreFile.Writer createDelFileWriterInTmp(Date date, long maxKeyCount,
public StoreFileWriter createDelFileWriterInTmp(Date date, long maxKeyCount,
Compression.Algorithm compression, byte[] startKey) throws IOException {
if (startKey == null) {
startKey = HConstants.EMPTY_START_ROW;
@ -230,7 +230,7 @@ public class HMobStore extends HStore {
* @return The writer for the mob file.
* @throws IOException
*/
public StoreFile.Writer createWriterInTmp(String date, Path basePath, long maxKeyCount,
public StoreFileWriter createWriterInTmp(String date, Path basePath, long maxKeyCount,
Compression.Algorithm compression, byte[] startKey) throws IOException {
MobFileName mobFileName = MobFileName.create(startKey, date, UUID.randomUUID()
.toString().replaceAll("-", ""));
@ -246,7 +246,7 @@ public class HMobStore extends HStore {
* @return The writer for the mob file.
* @throws IOException
*/
public StoreFile.Writer createWriterInTmp(MobFileName mobFileName, Path basePath,
public StoreFileWriter createWriterInTmp(MobFileName mobFileName, Path basePath,
long maxKeyCount, Compression.Algorithm compression) throws IOException {
final CacheConfig writerCacheConf = mobCacheConfig;
HFileContext hFileContext = new HFileContextBuilder().withCompression(compression)
@ -259,7 +259,7 @@ public class HMobStore extends HStore {
.withEncryptionContext(cryptoContext)
.withCreateTime(EnvironmentEdgeManager.currentTime()).build();
StoreFile.Writer w = new StoreFile.WriterBuilder(conf, writerCacheConf, region.getFilesystem())
StoreFileWriter w = new StoreFileWriter.Builder(conf, writerCacheConf, region.getFilesystem())
.withFilePath(new Path(basePath, mobFileName.getFileName()))
.withComparator(CellComparator.COMPARATOR).withBloomType(BloomType.NONE)
.withMaxKeyCount(maxKeyCount).withFileContext(hFileContext).build();

View File

@ -1691,7 +1691,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
Collection<StoreFile> storeFiles = store.getStorefiles();
if (storeFiles == null) continue;
for (StoreFile file : storeFiles) {
StoreFile.Reader sfReader = file.getReader();
StoreFileReader sfReader = file.getReader();
if (sfReader == null) continue;
HFile.Reader reader = sfReader.getHFileReader();
if (reader == null) continue;

View File

@ -611,7 +611,7 @@ public class HStore implements Store {
info.setRegionCoprocessorHost(this.region.getCoprocessorHost());
StoreFile storeFile = new StoreFile(this.getFileSystem(), info, this.conf, this.cacheConf,
this.family.getBloomFilterType());
StoreFile.Reader r = storeFile.createReader();
StoreFileReader r = storeFile.createReader();
r.setReplicaStoreFile(isPrimaryReplicaStore());
return storeFile;
}
@ -749,7 +749,7 @@ public class HStore implements Store {
}
private void bulkLoadHFile(StoreFile sf) throws IOException {
StoreFile.Reader r = sf.getReader();
StoreFileReader r = sf.getReader();
this.storeSize += r.length();
this.totalUncompressedBytes += r.getTotalUncompressedBytes();
@ -917,7 +917,7 @@ public class HStore implements Store {
status.setStatus("Flushing " + this + ": reopening flushed file");
StoreFile sf = createStoreFileAndReader(dstPath);
StoreFile.Reader r = sf.getReader();
StoreFileReader r = sf.getReader();
this.storeSize += r.length();
this.totalUncompressedBytes += r.getTotalUncompressedBytes();
@ -930,7 +930,7 @@ public class HStore implements Store {
}
@Override
public StoreFile.Writer createWriterInTmp(long maxKeyCount, Compression.Algorithm compression,
public StoreFileWriter createWriterInTmp(long maxKeyCount, Compression.Algorithm compression,
boolean isCompaction, boolean includeMVCCReadpoint,
boolean includesTag)
throws IOException {
@ -947,7 +947,7 @@ public class HStore implements Store {
* @return Writer for a new StoreFile in the tmp dir.
*/
@Override
public StoreFile.Writer createWriterInTmp(long maxKeyCount, Compression.Algorithm compression,
public StoreFileWriter createWriterInTmp(long maxKeyCount, Compression.Algorithm compression,
boolean isCompaction, boolean includeMVCCReadpoint, boolean includesTag,
boolean shouldDropBehind)
throws IOException {
@ -966,7 +966,7 @@ public class HStore implements Store {
}
HFileContext hFileContext = createFileContext(compression, includeMVCCReadpoint, includesTag,
cryptoContext);
StoreFile.Writer w = new StoreFile.WriterBuilder(conf, writerCacheConf,
StoreFileWriter w = new StoreFileWriter.Builder(conf, writerCacheConf,
this.getFileSystem())
.withFilePath(fs.createTempName())
.withComparator(comparator)
@ -1749,7 +1749,7 @@ public class HStore implements Store {
this.storeSize = 0L;
this.totalUncompressedBytes = 0L;
for (StoreFile hsf : this.storeEngine.getStoreFileManager().getStorefiles()) {
StoreFile.Reader r = hsf.getReader();
StoreFileReader r = hsf.getReader();
if (r == null) {
LOG.warn("StoreFile " + hsf + " has a null Reader");
continue;
@ -1900,7 +1900,7 @@ public class HStore implements Store {
public long getMaxStoreFileAge() {
long earliestTS = Long.MAX_VALUE;
for (StoreFile s: this.storeEngine.getStoreFileManager().getStorefiles()) {
StoreFile.Reader r = s.getReader();
StoreFileReader r = s.getReader();
if (r == null) {
LOG.warn("StoreFile " + s + " has a null Reader");
continue;
@ -1919,7 +1919,7 @@ public class HStore implements Store {
public long getMinStoreFileAge() {
long latestTS = 0;
for (StoreFile s: this.storeEngine.getStoreFileManager().getStorefiles()) {
StoreFile.Reader r = s.getReader();
StoreFileReader r = s.getReader();
if (r == null) {
LOG.warn("StoreFile " + s + " has a null Reader");
continue;
@ -1938,7 +1938,7 @@ public class HStore implements Store {
public long getAvgStoreFileAge() {
long sum = 0, count = 0;
for (StoreFile s: this.storeEngine.getStoreFileManager().getStorefiles()) {
StoreFile.Reader r = s.getReader();
StoreFileReader r = s.getReader();
if (r == null) {
LOG.warn("StoreFile " + s + " has a null Reader");
continue;
@ -1988,7 +1988,7 @@ public class HStore implements Store {
public long getStorefilesSize() {
long size = 0;
for (StoreFile s: this.storeEngine.getStoreFileManager().getStorefiles()) {
StoreFile.Reader r = s.getReader();
StoreFileReader r = s.getReader();
if (r == null) {
LOG.warn("StoreFile " + s + " has a null Reader");
continue;
@ -2002,7 +2002,7 @@ public class HStore implements Store {
public long getStorefilesIndexSize() {
long size = 0;
for (StoreFile s: this.storeEngine.getStoreFileManager().getStorefiles()) {
StoreFile.Reader r = s.getReader();
StoreFileReader r = s.getReader();
if (r == null) {
LOG.warn("StoreFile " + s + " has a null Reader");
continue;
@ -2016,7 +2016,7 @@ public class HStore implements Store {
public long getTotalStaticIndexSize() {
long size = 0;
for (StoreFile s : this.storeEngine.getStoreFileManager().getStorefiles()) {
StoreFile.Reader r = s.getReader();
StoreFileReader r = s.getReader();
if (r == null) {
continue;
}
@ -2029,7 +2029,7 @@ public class HStore implements Store {
public long getTotalStaticBloomSize() {
long size = 0;
for (StoreFile s : this.storeEngine.getStoreFileManager().getStorefiles()) {
StoreFile.Reader r = s.getReader();
StoreFileReader r = s.getReader();
if (r == null) {
continue;
}
@ -2430,7 +2430,7 @@ public class HStore implements Store {
for (final StoreFile file : compactedfiles) {
synchronized (file) {
try {
StoreFile.Reader r = file.getReader();
StoreFileReader r = file.getReader();
if (r == null) {
if (LOG.isDebugEnabled()) {
LOG.debug("The file " + file + " was closed but still not archived.");

View File

@ -1541,11 +1541,11 @@ public class RegionCoprocessorHost
* default behavior, null otherwise
* @throws IOException
*/
public StoreFile.Reader preStoreFileReaderOpen(final FileSystem fs, final Path p,
public StoreFileReader preStoreFileReaderOpen(final FileSystem fs, final Path p,
final FSDataInputStreamWrapper in, final long size, final CacheConfig cacheConf,
final Reference r) throws IOException {
return execOperationWithResult(null,
coprocessors.isEmpty() ? null : new RegionOperationWithResult<StoreFile.Reader>() {
coprocessors.isEmpty() ? null : new RegionOperationWithResult<StoreFileReader>() {
@Override
public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
throws IOException {
@ -1565,11 +1565,11 @@ public class RegionCoprocessorHost
* @return The reader to use
* @throws IOException
*/
public StoreFile.Reader postStoreFileReaderOpen(final FileSystem fs, final Path p,
public StoreFileReader postStoreFileReaderOpen(final FileSystem fs, final Path p,
final FSDataInputStreamWrapper in, final long size, final CacheConfig cacheConf,
final Reference r, final StoreFile.Reader reader) throws IOException {
final Reference r, final StoreFileReader reader) throws IOException {
return execOperationWithResult(reader,
coprocessors.isEmpty() ? null : new RegionOperationWithResult<StoreFile.Reader>() {
coprocessors.isEmpty() ? null : new RegionOperationWithResult<StoreFileReader>() {
@Override
public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
throws IOException {

View File

@ -164,7 +164,7 @@ public interface Store extends HeapSize, StoreConfigInformation, PropagatingConf
* @param includeMVCCReadpoint whether we should out the MVCC readpoint
* @return Writer for a new StoreFile in the tmp dir.
*/
StoreFile.Writer createWriterInTmp(
StoreFileWriter createWriterInTmp(
long maxKeyCount,
Compression.Algorithm compression,
boolean isCompaction,
@ -180,7 +180,7 @@ public interface Store extends HeapSize, StoreConfigInformation, PropagatingConf
* @param shouldDropBehind should the writer drop caches behind writes
* @return Writer for a new StoreFile in the tmp dir.
*/
StoreFile.Writer createWriterInTmp(
StoreFileWriter createWriterInTmp(
long maxKeyCount,
Compression.Algorithm compression,
boolean isCompaction,

View File

@ -233,7 +233,7 @@ public class StoreFileInfo {
* @param cacheConf The cache configuration and block cache reference.
* @return The StoreFile.Reader for the file
*/
public StoreFile.Reader open(final FileSystem fs,
public StoreFileReader open(final FileSystem fs,
final CacheConfig cacheConf, final boolean canUseDropBehind) throws IOException {
FSDataInputStreamWrapper in;
FileStatus status;
@ -257,7 +257,7 @@ public class StoreFileInfo {
long length = status.getLen();
hdfsBlocksDistribution = computeHDFSBlocksDistribution(fs);
StoreFile.Reader reader = null;
StoreFileReader reader = null;
if (this.coprocessorHost != null) {
reader = this.coprocessorHost.preStoreFileReaderOpen(fs, this.getPath(), in, length,
cacheConf, reference);
@ -267,7 +267,7 @@ public class StoreFileInfo {
reader = new HalfStoreFileReader(fs, this.getPath(), in, length, cacheConf, reference,
conf);
} else {
reader = new StoreFile.Reader(fs, status.getPath(), in, length, cacheConf, conf);
reader = new StoreFileReader(fs, status.getPath(), in, length, cacheConf, conf);
}
}
if (this.coprocessorHost != null) {

View File

@ -0,0 +1,647 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
import org.apache.hadoop.hbase.io.TimeRange;
import org.apache.hadoop.hbase.io.hfile.BlockType;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.io.hfile.HFileBlock;
import org.apache.hadoop.hbase.io.hfile.HFileScanner;
import org.apache.hadoop.hbase.nio.ByteBuff;
import org.apache.hadoop.hbase.util.BloomFilter;
import org.apache.hadoop.hbase.util.BloomFilterFactory;
import org.apache.hadoop.hbase.util.Bytes;
import java.io.DataInput;
import java.io.IOException;
import java.util.Map;
import java.util.SortedSet;
import java.util.concurrent.atomic.AtomicInteger;
/**
* Reader for a StoreFile.
*/
@InterfaceAudience.Private
public class StoreFileReader {
private static final Log LOG = LogFactory.getLog(StoreFileReader.class.getName());
protected BloomFilter generalBloomFilter = null;
protected BloomFilter deleteFamilyBloomFilter = null;
protected BloomType bloomFilterType;
private final HFile.Reader reader;
protected TimeRangeTracker timeRangeTracker = null;
protected long sequenceID = -1;
private byte[] lastBloomKey;
private long deleteFamilyCnt = -1;
private boolean bulkLoadResult = false;
private KeyValue.KeyOnlyKeyValue lastBloomKeyOnlyKV = null;
private boolean skipResetSeqId = true;
public AtomicInteger getRefCount() {
return refCount;
}
// Counter that is incremented every time a scanner is created on the
// store file. It is decremented when the scan on the store file is
// done.
private AtomicInteger refCount = new AtomicInteger(0);
// Indicates if the file got compacted
private volatile boolean compactedAway = false;
public StoreFileReader(FileSystem fs, Path path, CacheConfig cacheConf, Configuration conf)
throws IOException {
reader = HFile.createReader(fs, path, cacheConf, conf);
bloomFilterType = BloomType.NONE;
}
void markCompactedAway() {
this.compactedAway = true;
}
public StoreFileReader(FileSystem fs, Path path, FSDataInputStreamWrapper in, long size,
CacheConfig cacheConf, Configuration conf) throws IOException {
reader = HFile.createReader(fs, path, in, size, cacheConf, conf);
bloomFilterType = BloomType.NONE;
}
public void setReplicaStoreFile(boolean isPrimaryReplicaStoreFile) {
reader.setPrimaryReplicaReader(isPrimaryReplicaStoreFile);
}
public boolean isPrimaryReplicaReader() {
return reader.isPrimaryReplicaReader();
}
/**
* ONLY USE DEFAULT CONSTRUCTOR FOR UNIT TESTS
*/
StoreFileReader() {
this.reader = null;
}
public CellComparator getComparator() {
return reader.getComparator();
}
/**
* Get a scanner to scan over this StoreFile. Do not use
* this overload if using this scanner for compactions.
*
* @param cacheBlocks should this scanner cache blocks?
* @param pread use pread (for highly concurrent small readers)
* @return a scanner
*/
public StoreFileScanner getStoreFileScanner(boolean cacheBlocks,
boolean pread) {
return getStoreFileScanner(cacheBlocks, pread, false,
// 0 is passed as readpoint because this method is only used by test
// where StoreFile is directly operated upon
0);
}
/**
* Get a scanner to scan over this StoreFile.
*
* @param cacheBlocks should this scanner cache blocks?
* @param pread use pread (for highly concurrent small readers)
* @param isCompaction is scanner being used for compaction?
* @return a scanner
*/
public StoreFileScanner getStoreFileScanner(boolean cacheBlocks,
boolean pread,
boolean isCompaction, long readPt) {
// Increment the ref count
refCount.incrementAndGet();
return new StoreFileScanner(this,
getScanner(cacheBlocks, pread, isCompaction),
!isCompaction, reader.hasMVCCInfo(), readPt);
}
/**
* Decrement the ref count associated with the reader when ever a scanner associated
* with the reader is closed
*/
void decrementRefCount() {
refCount.decrementAndGet();
}
/**
* @return true if the file is still used in reads
*/
public boolean isReferencedInReads() {
return refCount.get() != 0;
}
/**
* @return true if the file is compacted
*/
public boolean isCompactedAway() {
return this.compactedAway;
}
/**
* @deprecated Do not write further code which depends on this call. Instead
* use getStoreFileScanner() which uses the StoreFileScanner class/interface
* which is the preferred way to scan a store with higher level concepts.
*
* @param cacheBlocks should we cache the blocks?
* @param pread use pread (for concurrent small readers)
* @return the underlying HFileScanner
*/
@Deprecated
public HFileScanner getScanner(boolean cacheBlocks, boolean pread) {
return getScanner(cacheBlocks, pread, false);
}
/**
* @deprecated Do not write further code which depends on this call. Instead
* use getStoreFileScanner() which uses the StoreFileScanner class/interface
* which is the preferred way to scan a store with higher level concepts.
*
* @param cacheBlocks
* should we cache the blocks?
* @param pread
* use pread (for concurrent small readers)
* @param isCompaction
* is scanner being used for compaction?
* @return the underlying HFileScanner
*/
@Deprecated
public HFileScanner getScanner(boolean cacheBlocks, boolean pread,
boolean isCompaction) {
return reader.getScanner(cacheBlocks, pread, isCompaction);
}
public void close(boolean evictOnClose) throws IOException {
reader.close(evictOnClose);
}
/**
* Check if this storeFile may contain keys within the TimeRange that
* have not expired (i.e. not older than oldestUnexpiredTS).
* @param timeRange the timeRange to restrict
* @param oldestUnexpiredTS the oldest timestamp that is not expired, as
* determined by the column family's TTL
* @return false if queried keys definitely don't exist in this StoreFile
*/
boolean passesTimerangeFilter(TimeRange timeRange, long oldestUnexpiredTS) {
if (timeRangeTracker == null) {
return true;
} else {
return timeRangeTracker.includesTimeRange(timeRange) &&
timeRangeTracker.getMaximumTimestamp() >= oldestUnexpiredTS;
}
}
/**
* Checks whether the given scan passes the Bloom filter (if present). Only
* checks Bloom filters for single-row or single-row-column scans. Bloom
* filter checking for multi-gets is implemented as part of the store
* scanner system (see {@link StoreFileScanner#seekExactly}) and uses
* the lower-level API {@link #passesGeneralRowBloomFilter(byte[], int, int)}
* and {@link #passesGeneralRowColBloomFilter(Cell)}.
*
* @param scan the scan specification. Used to determine the row, and to
* check whether this is a single-row ("get") scan.
* @param columns the set of columns. Only used for row-column Bloom
* filters.
* @return true if the scan with the given column set passes the Bloom
* filter, or if the Bloom filter is not applicable for the scan.
* False if the Bloom filter is applicable and the scan fails it.
*/
boolean passesBloomFilter(Scan scan, final SortedSet<byte[]> columns) {
// Multi-column non-get scans will use Bloom filters through the
// lower-level API function that this function calls.
if (!scan.isGetScan()) {
return true;
}
byte[] row = scan.getStartRow();
switch (this.bloomFilterType) {
case ROW:
return passesGeneralRowBloomFilter(row, 0, row.length);
case ROWCOL:
if (columns != null && columns.size() == 1) {
byte[] column = columns.first();
// create the required fake key
Cell kvKey = KeyValueUtil.createFirstOnRow(row, 0, row.length,
HConstants.EMPTY_BYTE_ARRAY, 0, 0, column, 0,
column.length);
return passesGeneralRowColBloomFilter(kvKey);
}
// For multi-column queries the Bloom filter is checked from the
// seekExact operation.
return true;
default:
return true;
}
}
public boolean passesDeleteFamilyBloomFilter(byte[] row, int rowOffset,
int rowLen) {
// Cache Bloom filter as a local variable in case it is set to null by
// another thread on an IO error.
BloomFilter bloomFilter = this.deleteFamilyBloomFilter;
// Empty file or there is no delete family at all
if (reader.getTrailer().getEntryCount() == 0 || deleteFamilyCnt == 0) {
return false;
}
if (bloomFilter == null) {
return true;
}
try {
if (!bloomFilter.supportsAutoLoading()) {
return true;
}
return bloomFilter.contains(row, rowOffset, rowLen, null);
} catch (IllegalArgumentException e) {
LOG.error("Bad Delete Family bloom filter data -- proceeding without",
e);
setDeleteFamilyBloomFilterFaulty();
}
return true;
}
/**
* A method for checking Bloom filters. Called directly from
* StoreFileScanner in case of a multi-column query.
*
* @return True if passes
*/
public boolean passesGeneralRowBloomFilter(byte[] row, int rowOffset, int rowLen) {
BloomFilter bloomFilter = this.generalBloomFilter;
if (bloomFilter == null) {
return true;
}
// Used in ROW bloom
byte[] key = null;
if (rowOffset != 0 || rowLen != row.length) {
throw new AssertionError(
"For row-only Bloom filters the row " + "must occupy the whole array");
}
key = row;
return checkGeneralBloomFilter(key, null, bloomFilter);
}
/**
* A method for checking Bloom filters. Called directly from
* StoreFileScanner in case of a multi-column query.
*
* @param cell
* the cell to check if present in BloomFilter
* @return True if passes
*/
public boolean passesGeneralRowColBloomFilter(Cell cell) {
BloomFilter bloomFilter = this.generalBloomFilter;
if (bloomFilter == null) {
return true;
}
// Used in ROW_COL bloom
Cell kvKey = null;
// Already if the incoming key is a fake rowcol key then use it as it is
if (cell.getTypeByte() == KeyValue.Type.Maximum.getCode() && cell.getFamilyLength() == 0) {
kvKey = cell;
} else {
kvKey = CellUtil.createFirstOnRowCol(cell);
}
return checkGeneralBloomFilter(null, kvKey, bloomFilter);
}
private boolean checkGeneralBloomFilter(byte[] key, Cell kvKey, BloomFilter bloomFilter) {
// Empty file
if (reader.getTrailer().getEntryCount() == 0) {
return false;
}
HFileBlock bloomBlock = null;
try {
boolean shouldCheckBloom;
ByteBuff bloom;
if (bloomFilter.supportsAutoLoading()) {
bloom = null;
shouldCheckBloom = true;
} else {
bloomBlock = reader.getMetaBlock(HFile.BLOOM_FILTER_DATA_KEY, true);
bloom = bloomBlock.getBufferWithoutHeader();
shouldCheckBloom = bloom != null;
}
if (shouldCheckBloom) {
boolean exists;
// Whether the primary Bloom key is greater than the last Bloom key
// from the file info. For row-column Bloom filters this is not yet
// a sufficient condition to return false.
boolean keyIsAfterLast = (lastBloomKey != null);
// hbase:meta does not have blooms. So we need not have special interpretation
// of the hbase:meta cells. We can safely use Bytes.BYTES_RAWCOMPARATOR for ROW Bloom
if (keyIsAfterLast) {
if (bloomFilterType == BloomType.ROW) {
keyIsAfterLast = (Bytes.BYTES_RAWCOMPARATOR.compare(key, lastBloomKey) > 0);
} else {
keyIsAfterLast = (CellComparator.COMPARATOR.compare(kvKey, lastBloomKeyOnlyKV)) > 0;
}
}
if (bloomFilterType == BloomType.ROWCOL) {
// Since a Row Delete is essentially a DeleteFamily applied to all
// columns, a file might be skipped if using row+col Bloom filter.
// In order to ensure this file is included an additional check is
// required looking only for a row bloom.
Cell rowBloomKey = CellUtil.createFirstOnRow(kvKey);
// hbase:meta does not have blooms. So we need not have special interpretation
// of the hbase:meta cells. We can safely use Bytes.BYTES_RAWCOMPARATOR for ROW Bloom
if (keyIsAfterLast
&& (CellComparator.COMPARATOR.compare(rowBloomKey, lastBloomKeyOnlyKV)) > 0) {
exists = false;
} else {
exists =
bloomFilter.contains(kvKey, bloom) ||
bloomFilter.contains(rowBloomKey, bloom);
}
} else {
exists = !keyIsAfterLast
&& bloomFilter.contains(key, 0, key.length, bloom);
}
return exists;
}
} catch (IOException e) {
LOG.error("Error reading bloom filter data -- proceeding without",
e);
setGeneralBloomFilterFaulty();
} catch (IllegalArgumentException e) {
LOG.error("Bad bloom filter data -- proceeding without", e);
setGeneralBloomFilterFaulty();
} finally {
// Return the bloom block so that its ref count can be decremented.
reader.returnBlock(bloomBlock);
}
return true;
}
/**
* Checks whether the given scan rowkey range overlaps with the current storefile's
* @param scan the scan specification. Used to determine the rowkey range.
* @return true if there is overlap, false otherwise
*/
public boolean passesKeyRangeFilter(Scan scan) {
if (this.getFirstKey() == null || this.getLastKey() == null) {
// the file is empty
return false;
}
if (Bytes.equals(scan.getStartRow(), HConstants.EMPTY_START_ROW)
&& Bytes.equals(scan.getStopRow(), HConstants.EMPTY_END_ROW)) {
return true;
}
byte[] smallestScanRow = scan.isReversed() ? scan.getStopRow() : scan.getStartRow();
byte[] largestScanRow = scan.isReversed() ? scan.getStartRow() : scan.getStopRow();
Cell firstKeyKV = this.getFirstKey();
Cell lastKeyKV = this.getLastKey();
boolean nonOverLapping = (getComparator().compareRows(firstKeyKV,
largestScanRow, 0, largestScanRow.length) > 0
&& !Bytes
.equals(scan.isReversed() ? scan.getStartRow() : scan.getStopRow(),
HConstants.EMPTY_END_ROW))
|| getComparator().compareRows(lastKeyKV, smallestScanRow, 0, smallestScanRow.length) < 0;
return !nonOverLapping;
}
public Map<byte[], byte[]> loadFileInfo() throws IOException {
Map<byte [], byte []> fi = reader.loadFileInfo();
byte[] b = fi.get(StoreFile.BLOOM_FILTER_TYPE_KEY);
if (b != null) {
bloomFilterType = BloomType.valueOf(Bytes.toString(b));
}
lastBloomKey = fi.get(StoreFile.LAST_BLOOM_KEY);
if(bloomFilterType == BloomType.ROWCOL) {
lastBloomKeyOnlyKV = new KeyValue.KeyOnlyKeyValue(lastBloomKey, 0, lastBloomKey.length);
}
byte[] cnt = fi.get(StoreFile.DELETE_FAMILY_COUNT);
if (cnt != null) {
deleteFamilyCnt = Bytes.toLong(cnt);
}
return fi;
}
public void loadBloomfilter() {
this.loadBloomfilter(BlockType.GENERAL_BLOOM_META);
this.loadBloomfilter(BlockType.DELETE_FAMILY_BLOOM_META);
}
public void loadBloomfilter(BlockType blockType) {
try {
if (blockType == BlockType.GENERAL_BLOOM_META) {
if (this.generalBloomFilter != null)
return; // Bloom has been loaded
DataInput bloomMeta = reader.getGeneralBloomFilterMetadata();
if (bloomMeta != null) {
// sanity check for NONE Bloom filter
if (bloomFilterType == BloomType.NONE) {
throw new IOException(
"valid bloom filter type not found in FileInfo");
} else {
generalBloomFilter = BloomFilterFactory.createFromMeta(bloomMeta,
reader);
if (LOG.isTraceEnabled()) {
LOG.trace("Loaded " + bloomFilterType.toString() + " "
+ generalBloomFilter.getClass().getSimpleName()
+ " metadata for " + reader.getName());
}
}
}
} else if (blockType == BlockType.DELETE_FAMILY_BLOOM_META) {
if (this.deleteFamilyBloomFilter != null)
return; // Bloom has been loaded
DataInput bloomMeta = reader.getDeleteBloomFilterMetadata();
if (bloomMeta != null) {
deleteFamilyBloomFilter = BloomFilterFactory.createFromMeta(
bloomMeta, reader);
LOG.info("Loaded Delete Family Bloom ("
+ deleteFamilyBloomFilter.getClass().getSimpleName()
+ ") metadata for " + reader.getName());
}
} else {
throw new RuntimeException("Block Type: " + blockType.toString()
+ "is not supported for Bloom filter");
}
} catch (IOException e) {
LOG.error("Error reading bloom filter meta for " + blockType
+ " -- proceeding without", e);
setBloomFilterFaulty(blockType);
} catch (IllegalArgumentException e) {
LOG.error("Bad bloom filter meta " + blockType
+ " -- proceeding without", e);
setBloomFilterFaulty(blockType);
}
}
private void setBloomFilterFaulty(BlockType blockType) {
if (blockType == BlockType.GENERAL_BLOOM_META) {
setGeneralBloomFilterFaulty();
} else if (blockType == BlockType.DELETE_FAMILY_BLOOM_META) {
setDeleteFamilyBloomFilterFaulty();
}
}
/**
* The number of Bloom filter entries in this store file, or an estimate
* thereof, if the Bloom filter is not loaded. This always returns an upper
* bound of the number of Bloom filter entries.
*
* @return an estimate of the number of Bloom filter entries in this file
*/
public long getFilterEntries() {
return generalBloomFilter != null ? generalBloomFilter.getKeyCount()
: reader.getEntries();
}
public void setGeneralBloomFilterFaulty() {
generalBloomFilter = null;
}
public void setDeleteFamilyBloomFilterFaulty() {
this.deleteFamilyBloomFilter = null;
}
public Cell getLastKey() {
return reader.getLastKey();
}
public byte[] getLastRowKey() {
return reader.getLastRowKey();
}
public Cell midkey() throws IOException {
return reader.midkey();
}
public long length() {
return reader.length();
}
public long getTotalUncompressedBytes() {
return reader.getTrailer().getTotalUncompressedBytes();
}
public long getEntries() {
return reader.getEntries();
}
public long getDeleteFamilyCnt() {
return deleteFamilyCnt;
}
public Cell getFirstKey() {
return reader.getFirstKey();
}
public long indexSize() {
return reader.indexSize();
}
public BloomType getBloomFilterType() {
return this.bloomFilterType;
}
public long getSequenceID() {
return sequenceID;
}
public void setSequenceID(long sequenceID) {
this.sequenceID = sequenceID;
}
public void setBulkLoaded(boolean bulkLoadResult) {
this.bulkLoadResult = bulkLoadResult;
}
public boolean isBulkLoaded() {
return this.bulkLoadResult;
}
BloomFilter getGeneralBloomFilter() {
return generalBloomFilter;
}
long getUncompressedDataIndexSize() {
return reader.getTrailer().getUncompressedDataIndexSize();
}
public long getTotalBloomSize() {
if (generalBloomFilter == null)
return 0;
return generalBloomFilter.getByteSize();
}
public int getHFileVersion() {
return reader.getTrailer().getMajorVersion();
}
public int getHFileMinorVersion() {
return reader.getTrailer().getMinorVersion();
}
public HFile.Reader getHFileReader() {
return reader;
}
void disableBloomFilterForTesting() {
generalBloomFilter = null;
this.deleteFamilyBloomFilter = null;
}
public long getMaxTimestamp() {
return timeRangeTracker == null ? Long.MAX_VALUE : timeRangeTracker.getMaximumTimestamp();
}
boolean isSkipResetSeqId() {
return skipResetSeqId;
}
void setSkipResetSeqId(boolean skipResetSeqId) {
this.skipResetSeqId = skipResetSeqId;
}
}

View File

@ -35,7 +35,6 @@ import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.TimeRange;
import org.apache.hadoop.hbase.io.hfile.HFileScanner;
import org.apache.hadoop.hbase.regionserver.StoreFile.Reader;
import org.apache.hadoop.hbase.util.Counter;
/**
@ -45,7 +44,7 @@ import org.apache.hadoop.hbase.util.Counter;
@InterfaceAudience.LimitedPrivate("Coprocessor")
public class StoreFileScanner implements KeyValueScanner {
// the reader it comes from:
private final StoreFile.Reader reader;
private final StoreFileReader reader;
private final HFileScanner hfs;
private Cell cur = null;
private boolean closed = false;
@ -70,7 +69,7 @@ public class StoreFileScanner implements KeyValueScanner {
* Implements a {@link KeyValueScanner} on top of the specified {@link HFileScanner}
* @param hfs HFile scanner
*/
public StoreFileScanner(StoreFile.Reader reader, HFileScanner hfs, boolean useMVCC,
public StoreFileScanner(StoreFileReader reader, HFileScanner hfs, boolean useMVCC,
boolean hasMVCC, long readPt) {
this.readPt = readPt;
this.reader = reader;
@ -117,7 +116,7 @@ public class StoreFileScanner implements KeyValueScanner {
List<StoreFileScanner> scanners = new ArrayList<StoreFileScanner>(
files.size());
for (StoreFile file : files) {
StoreFile.Reader r = file.createReader(canUseDrop);
StoreFileReader r = file.createReader(canUseDrop);
r.setReplicaStoreFile(isPrimaryReplica);
StoreFileScanner scanner = r.getStoreFileScanner(cacheBlocks, usePread,
isCompaction, readPt);
@ -384,7 +383,7 @@ public class StoreFileScanner implements KeyValueScanner {
return true;
}
Reader getReader() {
StoreFileReader getReader() {
return reader;
}

View File

@ -0,0 +1,514 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import com.google.common.base.Preconditions;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.io.hfile.HFileContext;
import org.apache.hadoop.hbase.regionserver.compactions.Compactor;
import org.apache.hadoop.hbase.util.BloomFilterFactory;
import org.apache.hadoop.hbase.util.BloomFilterWriter;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.WritableUtils;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Arrays;
/**
* A StoreFile writer. Use this to read/write HBase Store Files. It is package
* local because it is an implementation detail of the HBase regionserver.
*/
@InterfaceAudience.Private
public class StoreFileWriter implements Compactor.CellSink {
private static final Log LOG = LogFactory.getLog(StoreFileWriter.class.getName());
private final BloomFilterWriter generalBloomFilterWriter;
private final BloomFilterWriter deleteFamilyBloomFilterWriter;
private final BloomType bloomType;
private byte[] lastBloomKey;
private int lastBloomKeyOffset, lastBloomKeyLen;
private Cell lastCell = null;
private long earliestPutTs = HConstants.LATEST_TIMESTAMP;
private Cell lastDeleteFamilyCell = null;
private long deleteFamilyCnt = 0;
TimeRangeTracker timeRangeTracker = new TimeRangeTracker();
/* isTimeRangeTrackerSet keeps track if the timeRange has already been set
* When flushing a memstore, we set TimeRange and use this variable to
* indicate that it doesn't need to be calculated again while
* appending KeyValues.
* It is not set in cases of compactions when it is recalculated using only
* the appended KeyValues*/
boolean isTimeRangeTrackerSet = false;
protected HFile.Writer writer;
private KeyValue.KeyOnlyKeyValue lastBloomKeyOnlyKV = null;
/**
* Creates an HFile.Writer that also write helpful meta data.
* @param fs file system to write to
* @param path file name to create
* @param conf user configuration
* @param comparator key comparator
* @param bloomType bloom filter setting
* @param maxKeys the expected maximum number of keys to be added. Was used
* for Bloom filter size in {@link HFile} format version 1.
* @param fileContext - The HFile context
* @throws IOException problem writing to FS
*/
StoreFileWriter(FileSystem fs, Path path, final Configuration conf, CacheConfig cacheConf,
final CellComparator comparator, BloomType bloomType, long maxKeys,
InetSocketAddress[] favoredNodes, HFileContext fileContext)
throws IOException {
writer = HFile.getWriterFactory(conf, cacheConf)
.withPath(fs, path)
.withComparator(comparator)
.withFavoredNodes(favoredNodes)
.withFileContext(fileContext)
.create();
generalBloomFilterWriter = BloomFilterFactory.createGeneralBloomAtWrite(
conf, cacheConf, bloomType,
(int) Math.min(maxKeys, Integer.MAX_VALUE), writer);
if (generalBloomFilterWriter != null) {
this.bloomType = bloomType;
if(this.bloomType == BloomType.ROWCOL) {
lastBloomKeyOnlyKV = new KeyValue.KeyOnlyKeyValue();
}
if (LOG.isTraceEnabled()) {
LOG.trace("Bloom filter type for " + path + ": " + this.bloomType + ", " +
generalBloomFilterWriter.getClass().getSimpleName());
}
} else {
// Not using Bloom filters.
this.bloomType = BloomType.NONE;
}
// initialize delete family Bloom filter when there is NO RowCol Bloom
// filter
if (this.bloomType != BloomType.ROWCOL) {
this.deleteFamilyBloomFilterWriter = BloomFilterFactory
.createDeleteBloomAtWrite(conf, cacheConf,
(int) Math.min(maxKeys, Integer.MAX_VALUE), writer);
} else {
deleteFamilyBloomFilterWriter = null;
}
if (deleteFamilyBloomFilterWriter != null && LOG.isTraceEnabled()) {
LOG.trace("Delete Family Bloom filter type for " + path + ": " +
deleteFamilyBloomFilterWriter.getClass().getSimpleName());
}
}
/**
* Writes meta data.
* Call before {@link #close()} since its written as meta data to this file.
* @param maxSequenceId Maximum sequence id.
* @param majorCompaction True if this file is product of a major compaction
* @throws IOException problem writing to FS
*/
public void appendMetadata(final long maxSequenceId, final boolean majorCompaction)
throws IOException {
writer.appendFileInfo(StoreFile.MAX_SEQ_ID_KEY, Bytes.toBytes(maxSequenceId));
writer.appendFileInfo(StoreFile.MAJOR_COMPACTION_KEY,
Bytes.toBytes(majorCompaction));
appendTrackedTimestampsToMetadata();
}
/**
* Writes meta data.
* Call before {@link #close()} since its written as meta data to this file.
* @param maxSequenceId Maximum sequence id.
* @param majorCompaction True if this file is product of a major compaction
* @param mobCellsCount The number of mob cells.
* @throws IOException problem writing to FS
*/
public void appendMetadata(final long maxSequenceId, final boolean majorCompaction,
final long mobCellsCount) throws IOException {
writer.appendFileInfo(StoreFile.MAX_SEQ_ID_KEY, Bytes.toBytes(maxSequenceId));
writer.appendFileInfo(StoreFile.MAJOR_COMPACTION_KEY, Bytes.toBytes(majorCompaction));
writer.appendFileInfo(StoreFile.MOB_CELLS_COUNT, Bytes.toBytes(mobCellsCount));
appendTrackedTimestampsToMetadata();
}
/**
* Add TimestampRange and earliest put timestamp to Metadata
*/
public void appendTrackedTimestampsToMetadata() throws IOException {
appendFileInfo(StoreFile.TIMERANGE_KEY, WritableUtils.toByteArray(timeRangeTracker));
appendFileInfo(StoreFile.EARLIEST_PUT_TS, Bytes.toBytes(earliestPutTs));
}
/**
* Set TimeRangeTracker
*/
public void setTimeRangeTracker(final TimeRangeTracker trt) {
this.timeRangeTracker = trt;
isTimeRangeTrackerSet = true;
}
/**
* Record the earlest Put timestamp.
*
* If the timeRangeTracker is not set,
* update TimeRangeTracker to include the timestamp of this key
*/
public void trackTimestamps(final Cell cell) {
if (KeyValue.Type.Put.getCode() == cell.getTypeByte()) {
earliestPutTs = Math.min(earliestPutTs, cell.getTimestamp());
}
if (!isTimeRangeTrackerSet) {
timeRangeTracker.includeTimestamp(cell);
}
}
private void appendGeneralBloomfilter(final Cell cell) throws IOException {
if (this.generalBloomFilterWriter != null) {
// only add to the bloom filter on a new, unique key
boolean newKey = true;
if (this.lastCell != null) {
switch(bloomType) {
case ROW:
newKey = ! CellUtil.matchingRows(cell, lastCell);
break;
case ROWCOL:
newKey = ! CellUtil.matchingRowColumn(cell, lastCell);
break;
case NONE:
newKey = false;
break;
default:
throw new IOException("Invalid Bloom filter type: " + bloomType +
" (ROW or ROWCOL expected)");
}
}
if (newKey) {
/*
* http://2.bp.blogspot.com/_Cib_A77V54U/StZMrzaKufI/AAAAAAAAADo/ZhK7bGoJdMQ/s400/KeyValue.png
* Key = RowLen + Row + FamilyLen + Column [Family + Qualifier] + TimeStamp
*
* 2 Types of Filtering:
* 1. Row = Row
* 2. RowCol = Row + Qualifier
*/
byte[] bloomKey = null;
// Used with ROW_COL bloom
KeyValue bloomKeyKV = null;
int bloomKeyOffset, bloomKeyLen;
switch (bloomType) {
case ROW:
bloomKey = cell.getRowArray();
bloomKeyOffset = cell.getRowOffset();
bloomKeyLen = cell.getRowLength();
break;
case ROWCOL:
// merge(row, qualifier)
// TODO: could save one buffer copy in case of compound Bloom
// filters when this involves creating a KeyValue
// TODO : Handle while writes also
bloomKeyKV = KeyValueUtil.createFirstOnRow(cell.getRowArray(), cell.getRowOffset(),
cell.getRowLength(),
HConstants.EMPTY_BYTE_ARRAY, 0, 0, cell.getQualifierArray(),
cell.getQualifierOffset(),
cell.getQualifierLength());
bloomKey = bloomKeyKV.getBuffer();
bloomKeyOffset = bloomKeyKV.getKeyOffset();
bloomKeyLen = bloomKeyKV.getKeyLength();
break;
default:
throw new IOException("Invalid Bloom filter type: " + bloomType +
" (ROW or ROWCOL expected)");
}
generalBloomFilterWriter.add(bloomKey, bloomKeyOffset, bloomKeyLen);
if (lastBloomKey != null) {
int res = 0;
// hbase:meta does not have blooms. So we need not have special interpretation
// of the hbase:meta cells. We can safely use Bytes.BYTES_RAWCOMPARATOR for ROW Bloom
if (bloomType == BloomType.ROW) {
res = Bytes.BYTES_RAWCOMPARATOR.compare(bloomKey, bloomKeyOffset, bloomKeyLen,
lastBloomKey, lastBloomKeyOffset, lastBloomKeyLen);
} else {
// TODO : Caching of kv components becomes important in these cases
res = CellComparator.COMPARATOR.compare(bloomKeyKV, lastBloomKeyOnlyKV);
}
if (res <= 0) {
throw new IOException("Non-increasing Bloom keys: "
+ Bytes.toStringBinary(bloomKey, bloomKeyOffset, bloomKeyLen) + " after "
+ Bytes.toStringBinary(lastBloomKey, lastBloomKeyOffset, lastBloomKeyLen));
}
}
lastBloomKey = bloomKey;
lastBloomKeyOffset = bloomKeyOffset;
lastBloomKeyLen = bloomKeyLen;
if (bloomType == BloomType.ROWCOL) {
lastBloomKeyOnlyKV.setKey(bloomKey, bloomKeyOffset, bloomKeyLen);
}
this.lastCell = cell;
}
}
}
private void appendDeleteFamilyBloomFilter(final Cell cell)
throws IOException {
if (!CellUtil.isDeleteFamily(cell) && !CellUtil.isDeleteFamilyVersion(cell)) {
return;
}
// increase the number of delete family in the store file
deleteFamilyCnt++;
if (null != this.deleteFamilyBloomFilterWriter) {
boolean newKey = true;
if (lastDeleteFamilyCell != null) {
// hbase:meta does not have blooms. So we need not have special interpretation
// of the hbase:meta cells
newKey = !CellUtil.matchingRows(cell, lastDeleteFamilyCell);
}
if (newKey) {
this.deleteFamilyBloomFilterWriter.add(cell.getRowArray(),
cell.getRowOffset(), cell.getRowLength());
this.lastDeleteFamilyCell = cell;
}
}
}
public void append(final Cell cell) throws IOException {
appendGeneralBloomfilter(cell);
appendDeleteFamilyBloomFilter(cell);
writer.append(cell);
trackTimestamps(cell);
}
public Path getPath() {
return this.writer.getPath();
}
public boolean hasGeneralBloom() {
return this.generalBloomFilterWriter != null;
}
/**
* For unit testing only.
*
* @return the Bloom filter used by this writer.
*/
BloomFilterWriter getGeneralBloomWriter() {
return generalBloomFilterWriter;
}
private boolean closeBloomFilter(BloomFilterWriter bfw) throws IOException {
boolean haveBloom = (bfw != null && bfw.getKeyCount() > 0);
if (haveBloom) {
bfw.compactBloom();
}
return haveBloom;
}
private boolean closeGeneralBloomFilter() throws IOException {
boolean hasGeneralBloom = closeBloomFilter(generalBloomFilterWriter);
// add the general Bloom filter writer and append file info
if (hasGeneralBloom) {
writer.addGeneralBloomFilter(generalBloomFilterWriter);
writer.appendFileInfo(StoreFile.BLOOM_FILTER_TYPE_KEY,
Bytes.toBytes(bloomType.toString()));
if (lastBloomKey != null) {
writer.appendFileInfo(StoreFile.LAST_BLOOM_KEY, Arrays.copyOfRange(
lastBloomKey, lastBloomKeyOffset, lastBloomKeyOffset
+ lastBloomKeyLen));
}
}
return hasGeneralBloom;
}
private boolean closeDeleteFamilyBloomFilter() throws IOException {
boolean hasDeleteFamilyBloom = closeBloomFilter(deleteFamilyBloomFilterWriter);
// add the delete family Bloom filter writer
if (hasDeleteFamilyBloom) {
writer.addDeleteFamilyBloomFilter(deleteFamilyBloomFilterWriter);
}
// append file info about the number of delete family kvs
// even if there is no delete family Bloom.
writer.appendFileInfo(StoreFile.DELETE_FAMILY_COUNT,
Bytes.toBytes(this.deleteFamilyCnt));
return hasDeleteFamilyBloom;
}
public void close() throws IOException {
boolean hasGeneralBloom = this.closeGeneralBloomFilter();
boolean hasDeleteFamilyBloom = this.closeDeleteFamilyBloomFilter();
writer.close();
// Log final Bloom filter statistics. This needs to be done after close()
// because compound Bloom filters might be finalized as part of closing.
if (LOG.isTraceEnabled()) {
LOG.trace((hasGeneralBloom ? "" : "NO ") + "General Bloom and " +
(hasDeleteFamilyBloom ? "" : "NO ") + "DeleteFamily" + " was added to HFile " +
getPath());
}
}
public void appendFileInfo(byte[] key, byte[] value) throws IOException {
writer.appendFileInfo(key, value);
}
/** For use in testing.
*/
HFile.Writer getHFileWriter() {
return writer;
}
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="ICAST_INTEGER_MULTIPLY_CAST_TO_LONG",
justification="Will not overflow")
public static class Builder {
private final Configuration conf;
private final CacheConfig cacheConf;
private final FileSystem fs;
private CellComparator comparator = CellComparator.COMPARATOR;
private BloomType bloomType = BloomType.NONE;
private long maxKeyCount = 0;
private Path dir;
private Path filePath;
private InetSocketAddress[] favoredNodes;
private HFileContext fileContext;
public Builder(Configuration conf, CacheConfig cacheConf,
FileSystem fs) {
this.conf = conf;
this.cacheConf = cacheConf;
this.fs = fs;
}
/**
* Use either this method or {@link #withFilePath}, but not both.
* @param dir Path to column family directory. The directory is created if
* does not exist. The file is given a unique name within this
* directory.
* @return this (for chained invocation)
*/
public Builder withOutputDir(Path dir) {
Preconditions.checkNotNull(dir);
this.dir = dir;
return this;
}
/**
* Use either this method or {@link #withOutputDir}, but not both.
* @param filePath the StoreFile path to write
* @return this (for chained invocation)
*/
public Builder withFilePath(Path filePath) {
Preconditions.checkNotNull(filePath);
this.filePath = filePath;
return this;
}
/**
* @param favoredNodes an array of favored nodes or possibly null
* @return this (for chained invocation)
*/
public Builder withFavoredNodes(InetSocketAddress[] favoredNodes) {
this.favoredNodes = favoredNodes;
return this;
}
public Builder withComparator(CellComparator comparator) {
Preconditions.checkNotNull(comparator);
this.comparator = comparator;
return this;
}
public Builder withBloomType(BloomType bloomType) {
Preconditions.checkNotNull(bloomType);
this.bloomType = bloomType;
return this;
}
/**
* @param maxKeyCount estimated maximum number of keys we expect to add
* @return this (for chained invocation)
*/
public Builder withMaxKeyCount(long maxKeyCount) {
this.maxKeyCount = maxKeyCount;
return this;
}
public Builder withFileContext(HFileContext fileContext) {
this.fileContext = fileContext;
return this;
}
public Builder withShouldDropCacheBehind(boolean shouldDropCacheBehind/*NOT USED!!*/) {
// TODO: HAS NO EFFECT!!! FIX!!
return this;
}
/**
* Create a store file writer. Client is responsible for closing file when
* done. If metadata, add BEFORE closing using
* {@link StoreFileWriter#appendMetadata}.
*/
public StoreFileWriter build() throws IOException {
if ((dir == null ? 0 : 1) + (filePath == null ? 0 : 1) != 1) {
throw new IllegalArgumentException("Either specify parent directory " +
"or file path");
}
if (dir == null) {
dir = filePath.getParent();
}
if (!fs.exists(dir)) {
fs.mkdirs(dir);
}
if (filePath == null) {
filePath = StoreFile.getUniqueFile(fs, dir);
if (!BloomFilterFactory.isGeneralBloomEnabled(conf)) {
bloomType = BloomType.NONE;
}
}
if (comparator == null) {
comparator = CellComparator.COMPARATOR;
}
return new StoreFileWriter(fs, filePath,
conf, cacheConf, comparator, bloomType, maxKeyCount, favoredNodes, fileContext);
}
}
}

View File

@ -61,7 +61,7 @@ abstract class StoreFlusher {
public abstract List<Path> flushSnapshot(MemStoreSnapshot snapshot, long cacheFlushSeqNum,
MonitoredTask status, ThroughputController throughputController) throws IOException;
protected void finalizeWriter(StoreFile.Writer writer, long cacheFlushSeqNum,
protected void finalizeWriter(StoreFileWriter writer, long cacheFlushSeqNum,
MonitoredTask status) throws IOException {
// Write out the log sequence number that corresponds to this output
// hfile. Also write current time in metadata as minFlushTime.

View File

@ -74,7 +74,7 @@ public class StoreUtils {
long maxSize = -1L;
StoreFile largestSf = null;
for (StoreFile sf : candidates) {
StoreFile.Reader r = sf.getReader();
StoreFileReader r = sf.getReader();
if (r == null) continue;
long size = r.length();
if (size > maxSize) {

View File

@ -30,7 +30,6 @@ import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.regionserver.StoreFile.Writer;
import org.apache.hadoop.hbase.util.Bytes;
/**
@ -43,7 +42,7 @@ public abstract class StripeMultiFileWriter extends AbstractMultiFileWriter {
private static final Log LOG = LogFactory.getLog(StripeMultiFileWriter.class);
protected final CellComparator comparator;
protected List<StoreFile.Writer> existingWriters;
protected List<StoreFileWriter> existingWriters;
protected List<byte[]> boundaries;
/** Whether to write stripe metadata */
@ -58,7 +57,7 @@ public abstract class StripeMultiFileWriter extends AbstractMultiFileWriter {
}
@Override
protected Collection<Writer> writers() {
protected Collection<StoreFileWriter> writers() {
return existingWriters;
}
@ -73,7 +72,7 @@ public abstract class StripeMultiFileWriter extends AbstractMultiFileWriter {
}
@Override
protected void preCloseWriter(Writer writer) throws IOException {
protected void preCloseWriter(StoreFileWriter writer) throws IOException {
if (doWriteStripeMetadata) {
if (LOG.isDebugEnabled()) {
LOG.debug("Write stripe metadata for " + writer.getPath().toString());
@ -126,7 +125,7 @@ public abstract class StripeMultiFileWriter extends AbstractMultiFileWriter {
* separate from all other such pairs.
*/
public static class BoundaryMultiWriter extends StripeMultiFileWriter {
private StoreFile.Writer currentWriter;
private StoreFileWriter currentWriter;
private byte[] currentWriterEndKey;
private Cell lastCell;
@ -144,7 +143,7 @@ public abstract class StripeMultiFileWriter extends AbstractMultiFileWriter {
byte[] majorRangeFrom, byte[] majorRangeTo) throws IOException {
super(comparator);
this.boundaries = targetBoundaries;
this.existingWriters = new ArrayList<StoreFile.Writer>(this.boundaries.size() - 1);
this.existingWriters = new ArrayList<StoreFileWriter>(this.boundaries.size() - 1);
// "major" range (range for which all files are included) boundaries, if any,
// must match some target boundaries, let's find them.
assert (majorRangeFrom == null) == (majorRangeTo == null);
@ -264,7 +263,7 @@ public abstract class StripeMultiFileWriter extends AbstractMultiFileWriter {
private byte[] right;
private Cell lastCell;
private StoreFile.Writer currentWriter;
private StoreFileWriter currentWriter;
protected byte[] lastRowInCurrentWriter = null;
private long cellsInCurrentWriter = 0;
private long cellsSeen = 0;
@ -284,7 +283,7 @@ public abstract class StripeMultiFileWriter extends AbstractMultiFileWriter {
this.left = left;
this.right = right;
int preallocate = Math.min(this.targetCount, 64);
this.existingWriters = new ArrayList<StoreFile.Writer>(preallocate);
this.existingWriters = new ArrayList<StoreFileWriter>(preallocate);
this.boundaries = new ArrayList<byte[]>(preallocate + 1);
}

View File

@ -31,8 +31,6 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
import org.apache.hadoop.hbase.regionserver.StoreFile.Writer;
import org.apache.hadoop.hbase.regionserver.StripeMultiFileWriter;
import org.apache.hadoop.hbase.regionserver.compactions.StripeCompactionPolicy;
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
@ -110,8 +108,8 @@ public class StripeStoreFlusher extends StoreFlusher {
final TimeRangeTracker tracker, final long kvCount) {
return new StripeMultiFileWriter.WriterFactory() {
@Override
public Writer createWriter() throws IOException {
StoreFile.Writer writer = store.createWriterInTmp(
public StoreFileWriter createWriter() throws IOException {
StoreFileWriter writer = store.createWriterInTmp(
kvCount, store.getFamily().getCompressionType(),
/* isCompaction = */ false,
/* includeMVCCReadpoint = */ true,

View File

@ -29,7 +29,7 @@ import org.apache.hadoop.hbase.regionserver.AbstractMultiFileWriter;
import org.apache.hadoop.hbase.regionserver.AbstractMultiFileWriter.WriterFactory;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.regionserver.StoreFile.Writer;
import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
import org.apache.hadoop.hbase.regionserver.StoreScanner;
/**
@ -50,7 +50,7 @@ public abstract class AbstractMultiOutputCompactor<T extends AbstractMultiFileWr
final FileDetails fd, final boolean shouldDropBehind) {
WriterFactory writerFactory = new WriterFactory() {
@Override
public Writer createWriter() throws IOException {
public StoreFileWriter createWriter() throws IOException {
return createTmpWriter(fd, shouldDropBehind);
}
};

View File

@ -33,7 +33,7 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.regionserver.StoreFile.Reader;
import org.apache.hadoop.hbase.regionserver.StoreFileReader;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.util.StringUtils;
@ -256,7 +256,7 @@ public class CompactionRequest implements Comparable<CompactionRequest> {
private void recalculateSize() {
long sz = 0;
for (StoreFile sf : this.filesToCompact) {
Reader r = sf.getReader();
StoreFileReader r = sf.getReader();
sz += r == null ? 0 : r.length();
}
this.totalSize = sz;

View File

@ -46,8 +46,9 @@ import org.apache.hadoop.hbase.regionserver.ScanType;
import org.apache.hadoop.hbase.regionserver.ScannerContext;
import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.regionserver.StoreFile.Writer;
import org.apache.hadoop.hbase.regionserver.StoreFileReader;
import org.apache.hadoop.hbase.regionserver.StoreFileScanner;
import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
import org.apache.hadoop.hbase.regionserver.StoreScanner;
import org.apache.hadoop.hbase.regionserver.TimeRangeTracker;
import org.apache.hadoop.hbase.regionserver.compactions.Compactor.CellSink;
@ -144,7 +145,7 @@ public abstract class Compactor<T extends CellSink> {
}
long seqNum = file.getMaxSequenceId();
fd.maxSeqId = Math.max(fd.maxSeqId, seqNum);
StoreFile.Reader r = file.getReader();
StoreFileReader r = file.getReader();
if (r == null) {
LOG.warn("Null reader for " + file.getPath());
continue;
@ -258,7 +259,8 @@ public abstract class Compactor<T extends CellSink> {
* @return Writer for a new StoreFile in the tmp dir.
* @throws IOException if creation failed
*/
protected Writer createTmpWriter(FileDetails fd, boolean shouldDropBehind) throws IOException {
protected StoreFileWriter createTmpWriter(FileDetails fd, boolean shouldDropBehind)
throws IOException {
// When all MVCC readpoints are 0, don't write them.
// See HBASE-8166, HBASE-12600, and HBASE-13389.
return store.createWriterInTmp(fd.maxKeyCount, this.compactionCompression,

View File

@ -29,7 +29,7 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.regionserver.StoreFile.Writer;
import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController;
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
import org.apache.hadoop.hbase.security.User;
@ -41,22 +41,22 @@ import com.google.common.collect.Lists;
* {@link #compact(CompactionRequest, ThroughputController, User)}
*/
@InterfaceAudience.Private
public class DefaultCompactor extends Compactor<Writer> {
public class DefaultCompactor extends Compactor<StoreFileWriter> {
private static final Log LOG = LogFactory.getLog(DefaultCompactor.class);
public DefaultCompactor(final Configuration conf, final Store store) {
super(conf, store);
}
private final CellSinkFactory<Writer> writerFactory = new CellSinkFactory<Writer>() {
@Override
public Writer createWriter(InternalScanner scanner,
org.apache.hadoop.hbase.regionserver.compactions.Compactor.FileDetails fd,
boolean shouldDropBehind) throws IOException {
return createTmpWriter(fd, shouldDropBehind);
}
};
private final CellSinkFactory<StoreFileWriter> writerFactory =
new CellSinkFactory<StoreFileWriter>() {
@Override
public StoreFileWriter createWriter(InternalScanner scanner,
org.apache.hadoop.hbase.regionserver.compactions.Compactor.FileDetails fd,
boolean shouldDropBehind) throws IOException {
return createTmpWriter(fd, shouldDropBehind);
}
};
/**
* Do a minor/major compaction on an explicit set of storefiles from a Store.
@ -84,7 +84,7 @@ public class DefaultCompactor extends Compactor<Writer> {
}
@Override
protected List<Path> commitWriter(Writer writer, FileDetails fd,
protected List<Path> commitWriter(StoreFileWriter writer, FileDetails fd,
CompactionRequest request) throws IOException {
List<Path> newFiles = Lists.newArrayList(writer.getPath());
writer.appendMetadata(fd.maxSeqId, request.isAllFiles());
@ -93,7 +93,7 @@ public class DefaultCompactor extends Compactor<Writer> {
}
@Override
protected void abortWriter(Writer writer) throws IOException {
protected void abortWriter(StoreFileWriter writer) throws IOException {
Path leftoverFile = writer.getPath();
try {
writer.close();

View File

@ -64,7 +64,7 @@ import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.hadoop.hbase.regionserver.ScanType;
import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.regionserver.StoreFile.Reader;
import org.apache.hadoop.hbase.regionserver.StoreFileReader;
import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.util.Bytes;
@ -683,17 +683,17 @@ public class SimpleRegionObserver extends BaseRegionObserver {
}
@Override
public Reader preStoreFileReaderOpen(ObserverContext<RegionCoprocessorEnvironment> ctx,
public StoreFileReader preStoreFileReaderOpen(ObserverContext<RegionCoprocessorEnvironment> ctx,
FileSystem fs, Path p, FSDataInputStreamWrapper in, long size, CacheConfig cacheConf,
Reference r, Reader reader) throws IOException {
Reference r, StoreFileReader reader) throws IOException {
ctPreStoreFileReaderOpen.incrementAndGet();
return null;
}
@Override
public Reader postStoreFileReaderOpen(ObserverContext<RegionCoprocessorEnvironment> ctx,
public StoreFileReader postStoreFileReaderOpen(ObserverContext<RegionCoprocessorEnvironment> ctx,
FileSystem fs, Path p, FSDataInputStreamWrapper in, long size, CacheConfig cacheConf,
Reference r, Reader reader) throws IOException {
Reference r, StoreFileReader reader) throws IOException {
ctPostStoreFileReaderOpen.incrementAndGet();
return reader;
}

View File

@ -55,7 +55,7 @@ import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache;
import org.apache.hadoop.hbase.regionserver.BloomType;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
import org.apache.hadoop.hbase.testclassification.IOTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.BloomFilterFactory;
@ -369,7 +369,7 @@ public class TestCacheOnWrite {
.withBlockSize(DATA_BLOCK_SIZE)
.withDataBlockEncoding(NoOpDataBlockEncoder.INSTANCE.getDataBlockEncoding())
.withIncludesTags(useTags).build();
StoreFile.Writer sfw = new StoreFile.WriterBuilder(conf, cacheConf, fs)
StoreFileWriter sfw = new StoreFileWriter.Builder(conf, cacheConf, fs)
.withOutputDir(storeFileParentDir).withComparator(CellComparator.COMPARATOR)
.withFileContext(meta)
.withBloomType(BLOOM_TYPE).withMaxKeyCount(NUM_KV).build();

View File

@ -29,8 +29,8 @@ import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.fs.HFileSystem;
import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
import org.apache.hadoop.hbase.testclassification.IOTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.junit.Before;
@ -96,7 +96,7 @@ public class TestPrefetch {
HFileContext meta = new HFileContextBuilder()
.withBlockSize(DATA_BLOCK_SIZE)
.build();
StoreFile.Writer sfw = new StoreFile.WriterBuilder(conf, cacheConf, fs)
StoreFileWriter sfw = new StoreFileWriter.Builder(conf, cacheConf, fs)
.withOutputDir(storeFileParentDir)
.withComparator(CellComparator.COMPARATOR)
.withFileContext(meta)

View File

@ -37,7 +37,7 @@ import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.fs.HFileSystem;
import org.apache.hadoop.hbase.regionserver.BloomType;
import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
import org.apache.hadoop.hbase.testclassification.IOTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.BloomFilterFactory;
@ -115,8 +115,8 @@ public class TestSeekBeforeWithInlineBlocks {
.withBlockSize(DATA_BLOCK_SIZE)
.build();
StoreFile.Writer storeFileWriter =
new StoreFile.WriterBuilder(conf, cacheConf, fs)
StoreFileWriter storeFileWriter =
new StoreFileWriter.Builder(conf, cacheConf, fs)
.withFilePath(hfilePath)
.withFileContext(meta)
.withBloomType(bloomType)

View File

@ -29,7 +29,7 @@ import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.Assert;
@ -47,7 +47,7 @@ public class MobTestUtil {
}
return sb.toString();
}
protected static void writeStoreFile(final StoreFile.Writer writer, String caseName)
protected static void writeStoreFile(final StoreFileWriter writer, String caseName)
throws IOException {
writeStoreFile(writer, Bytes.toBytes(caseName), Bytes.toBytes(caseName));
}
@ -60,7 +60,7 @@ public class MobTestUtil {
*
* @throws IOException
*/
private static void writeStoreFile(final StoreFile.Writer writer, byte[] fam,
private static void writeStoreFile(final StoreFileWriter writer, byte[] fam,
byte[] qualifier) throws IOException {
long now = System.currentTimeMillis();
try {

View File

@ -32,7 +32,7 @@ import org.apache.hadoop.hbase.KeyValue.Type;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.io.hfile.HFileContext;
import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.Assert;
@ -57,7 +57,7 @@ public class TestCachedMobFile extends TestCase{
Path testDir = TEST_UTIL.getDataTestDir();
FileSystem fs = testDir.getFileSystem(conf);
HFileContext meta = new HFileContextBuilder().withBlockSize(8*1024).build();
StoreFile.Writer writer = new StoreFile.WriterBuilder(conf, cacheConf, fs)
StoreFileWriter writer = new StoreFileWriter.Builder(conf, cacheConf, fs)
.withOutputDir(testDir).withFileContext(meta).build();
MobTestUtil.writeStoreFile(writer, caseName);
CachedMobFile cachedMobFile = CachedMobFile.create(fs, writer.getPath(), conf, cacheConf);
@ -79,12 +79,12 @@ public class TestCachedMobFile extends TestCase{
FileSystem fs = testDir.getFileSystem(conf);
Path outputDir1 = new Path(testDir, FAMILY1);
HFileContext meta = new HFileContextBuilder().withBlockSize(8 * 1024).build();
StoreFile.Writer writer1 = new StoreFile.WriterBuilder(conf, cacheConf, fs)
StoreFileWriter writer1 = new StoreFileWriter.Builder(conf, cacheConf, fs)
.withOutputDir(outputDir1).withFileContext(meta).build();
MobTestUtil.writeStoreFile(writer1, caseName);
CachedMobFile cachedMobFile1 = CachedMobFile.create(fs, writer1.getPath(), conf, cacheConf);
Path outputDir2 = new Path(testDir, FAMILY2);
StoreFile.Writer writer2 = new StoreFile.WriterBuilder(conf, cacheConf, fs)
StoreFileWriter writer2 = new StoreFileWriter.Builder(conf, cacheConf, fs)
.withOutputDir(outputDir2)
.withFileContext(meta)
.build();
@ -102,7 +102,7 @@ public class TestCachedMobFile extends TestCase{
Path testDir = TEST_UTIL.getDataTestDir();
FileSystem fs = testDir.getFileSystem(conf);
HFileContext meta = new HFileContextBuilder().withBlockSize(8 * 1024).build();
StoreFile.Writer writer = new StoreFile.WriterBuilder(conf, cacheConf, fs)
StoreFileWriter writer = new StoreFileWriter.Builder(conf, cacheConf, fs)
.withOutputDir(testDir).withFileContext(meta).build();
String caseName = getName();
MobTestUtil.writeStoreFile(writer, caseName);

View File

@ -35,6 +35,7 @@ import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
import org.apache.hadoop.hbase.regionserver.BloomType;
import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.regionserver.StoreFileScanner;
import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.Test;
@ -52,7 +53,7 @@ public class TestMobFile extends TestCase {
Path testDir = TEST_UTIL.getDataTestDir();
FileSystem fs = testDir.getFileSystem(conf);
HFileContext meta = new HFileContextBuilder().withBlockSize(8*1024).build();
StoreFile.Writer writer = new StoreFile.WriterBuilder(conf, cacheConf, fs)
StoreFileWriter writer = new StoreFileWriter.Builder(conf, cacheConf, fs)
.withOutputDir(testDir)
.withFileContext(meta)
.build();
@ -105,7 +106,7 @@ public class TestMobFile extends TestCase {
Path testDir = TEST_UTIL.getDataTestDir();
FileSystem fs = testDir.getFileSystem(conf);
HFileContext meta = new HFileContextBuilder().withBlockSize(8*1024).build();
StoreFile.Writer writer = new StoreFile.WriterBuilder(conf, cacheConf, fs)
StoreFileWriter writer = new StoreFileWriter.Builder(conf, cacheConf, fs)
.withOutputDir(testDir)
.withFileContext(meta)
.build();

View File

@ -37,7 +37,7 @@ import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.regionserver.HMobStore;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.Test;
@ -133,7 +133,7 @@ public class TestMobFileCache extends TestCase {
KeyValue[] keys = new KeyValue[] { key1, key2, key3 };
int maxKeyCount = keys.length;
HRegionInfo regionInfo = new HRegionInfo(tn);
StoreFile.Writer mobWriter = mobStore.createWriterInTmp(currentDate,
StoreFileWriter mobWriter = mobStore.createWriterInTmp(currentDate,
maxKeyCount, hcd.getCompactionCompression(), regionInfo.getStartKey());
Path mobFilePath = mobWriter.getPath();
String fileName = mobFilePath.getName();

View File

@ -48,8 +48,6 @@ import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
import org.apache.hadoop.hbase.mob.MobConstants;
import org.apache.hadoop.hbase.mob.MobFileName;
import org.apache.hadoop.hbase.mob.MobUtils;
import org.apache.hadoop.hbase.mob.compactions.PartitionedMobCompactionRequest;
import org.apache.hadoop.hbase.mob.compactions.PartitionedMobCompactor;
import org.apache.hadoop.hbase.mob.compactions.MobCompactionRequest.CompactionType;
import org.apache.hadoop.hbase.mob.compactions.PartitionedMobCompactionRequest.CompactionPartition;
import org.apache.hadoop.hbase.util.Bytes;
@ -306,7 +304,7 @@ public class TestPartitionedMobCompactor {
mobFileName = MobFileName.create(Bytes.toBytes(startKey + i), MobUtils.formatDate(
new Date()), mobSuffix);
}
StoreFile.Writer mobFileWriter = new StoreFile.WriterBuilder(conf, cacheConf, fs)
StoreFileWriter mobFileWriter = new StoreFileWriter.Builder(conf, cacheConf, fs)
.withFileContext(meta).withFilePath(new Path(basePath, mobFileName.getFileName())).build();
writeStoreFile(mobFileWriter, startRow, Bytes.toBytes(family), Bytes.toBytes(qualifier),
type, (i+1)*1000);
@ -322,7 +320,7 @@ public class TestPartitionedMobCompactor {
* @param type the key type
* @param size the size of value
*/
private static void writeStoreFile(final StoreFile.Writer writer, byte[]row, byte[] family,
private static void writeStoreFile(final StoreFileWriter writer, byte[]row, byte[] family,
byte[] qualifier, Type type, int size) throws IOException {
long now = System.currentTimeMillis();
try {

View File

@ -185,7 +185,7 @@ public class CreateRandomStoreFile {
HFileContext meta = new HFileContextBuilder().withCompression(compr)
.withBlockSize(blockSize).build();
StoreFile.Writer sfw = new StoreFile.WriterBuilder(conf,
StoreFileWriter sfw = new StoreFileWriter.Builder(conf,
new CacheConfig(conf), fs)
.withOutputDir(outputDir)
.withBloomType(bloomType)

View File

@ -595,7 +595,7 @@ public class DataBlockEncodingTool {
StoreFile hsf = new StoreFile(fs, path, conf, cacheConf,
BloomType.NONE);
StoreFile.Reader reader = hsf.createReader();
StoreFileReader reader = hsf.createReader();
reader.loadFileInfo();
KeyValueScanner scanner = reader.getStoreFileScanner(true, true);

View File

@ -62,7 +62,7 @@ public class EncodedSeekPerformanceTest {
StoreFile storeFile = new StoreFile(testingUtility.getTestFileSystem(),
path, configuration, cacheConf, BloomType.NONE);
StoreFile.Reader reader = storeFile.createReader();
StoreFileReader reader = storeFile.createReader();
StoreFileScanner scanner = reader.getStoreFileScanner(true, false);
Cell current;
@ -94,7 +94,7 @@ public class EncodedSeekPerformanceTest {
long totalSize = 0;
StoreFile.Reader reader = storeFile.createReader();
StoreFileReader reader = storeFile.createReader();
StoreFileScanner scanner = reader.getStoreFileScanner(true, false);
long startReadingTime = System.nanoTime();

View File

@ -27,7 +27,6 @@ import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HDFSBlocksDistribution;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.regionserver.RSRpcServices;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
@ -131,11 +130,11 @@ public class MockStoreFile extends StoreFile {
}
@Override
public StoreFile.Reader getReader() {
public StoreFileReader getReader() {
final long len = this.length;
final TimeRangeTracker timeRange = this.timeRangeTracker;
final long entries = this.entryCount;
return new StoreFile.Reader() {
return new StoreFileReader() {
@Override
public long length() {
return len;

View File

@ -207,7 +207,7 @@ public class TestCacheOnWriteInSchema {
@Test
public void testCacheOnWriteInSchema() throws IOException {
// Write some random data into the store
StoreFile.Writer writer = store.createWriterInTmp(Integer.MAX_VALUE,
StoreFileWriter writer = store.createWriterInTmp(Integer.MAX_VALUE,
HFile.DEFAULT_COMPRESSION_ALGORITHM, false, true, false);
writeStoreFile(writer);
writer.close();
@ -267,7 +267,7 @@ public class TestCacheOnWriteInSchema {
}
}
private void writeStoreFile(StoreFile.Writer writer) throws IOException {
private void writeStoreFile(StoreFileWriter writer) throws IOException {
final int rowLen = 32;
for (int i = 0; i < NUM_KV; ++i) {
byte[] k = RandomKeyValueUtil.randomOrderedKey(rand, i);

View File

@ -562,7 +562,7 @@ public class TestCompaction {
private static StoreFile createFile() throws Exception {
StoreFile sf = mock(StoreFile.class);
when(sf.getPath()).thenReturn(new Path("file"));
StoreFile.Reader r = mock(StoreFile.Reader.class);
StoreFileReader r = mock(StoreFileReader.class);
when(r.length()).thenReturn(10L);
when(sf.getReader()).thenReturn(r);
return sf;

View File

@ -201,7 +201,7 @@ public class TestCompoundBloomFilter {
private void readStoreFile(int t, BloomType bt, List<KeyValue> kvs,
Path sfPath) throws IOException {
StoreFile sf = new StoreFile(fs, sfPath, conf, cacheConf, bt);
StoreFile.Reader r = sf.createReader();
StoreFileReader r = sf.createReader();
final boolean pread = true; // does not really matter
StoreFileScanner scanner = r.getStoreFileScanner(true, pread);
@ -301,7 +301,7 @@ public class TestCompoundBloomFilter {
conf.setBoolean(CacheConfig.CACHE_BLOCKS_ON_WRITE_KEY, true);
cacheConf = new CacheConfig(conf);
HFileContext meta = new HFileContextBuilder().withBlockSize(BLOCK_SIZES[t]).build();
StoreFile.Writer w = new StoreFile.WriterBuilder(conf, cacheConf, fs)
StoreFileWriter w = new StoreFileWriter.Builder(conf, cacheConf, fs)
.withOutputDir(TEST_UTIL.getDataTestDir())
.withBloomType(bt)
.withFileContext(meta)

View File

@ -81,7 +81,7 @@ public class TestFSErrorsExposed {
FileSystem fs = new HFileSystem(faultyfs);
CacheConfig cacheConf = new CacheConfig(util.getConfiguration());
HFileContext meta = new HFileContextBuilder().withBlockSize(2 * 1024).build();
StoreFile.Writer writer = new StoreFile.WriterBuilder(
StoreFileWriter writer = new StoreFileWriter.Builder(
util.getConfiguration(), cacheConf, hfs)
.withOutputDir(hfilePath)
.withFileContext(meta)
@ -92,7 +92,7 @@ public class TestFSErrorsExposed {
StoreFile sf = new StoreFile(fs, writer.getPath(),
util.getConfiguration(), cacheConf, BloomType.NONE);
StoreFile.Reader reader = sf.createReader();
StoreFileReader reader = sf.createReader();
HFileScanner scanner = reader.getScanner(false, true);
FaultyInputStream inStream = faultyfs.inStreams.get(0).get();
@ -131,7 +131,7 @@ public class TestFSErrorsExposed {
HFileSystem fs = new HFileSystem(faultyfs);
CacheConfig cacheConf = new CacheConfig(util.getConfiguration());
HFileContext meta = new HFileContextBuilder().withBlockSize(2 * 1024).build();
StoreFile.Writer writer = new StoreFile.WriterBuilder(
StoreFileWriter writer = new StoreFileWriter.Builder(
util.getConfiguration(), cacheConf, hfs)
.withOutputDir(hfilePath)
.withFileContext(meta)

View File

@ -178,7 +178,7 @@ public class TestHMobStore {
KeyValue key3 = new KeyValue(row2, family, qf3, 1, value2);
KeyValue[] keys = new KeyValue[] { key1, key2, key3 };
int maxKeyCount = keys.length;
StoreFile.Writer mobWriter = store.createWriterInTmp(currentDate, maxKeyCount,
StoreFileWriter mobWriter = store.createWriterInTmp(currentDate, maxKeyCount,
hcd.getCompactionCompression(), region.getRegionInfo().getStartKey());
mobFilePath = mobWriter.getPath();

View File

@ -4214,7 +4214,7 @@ public class TestHRegion {
HStore store = (HStore) region.getStore(fam1);
Collection<StoreFile> storeFiles = store.getStorefiles();
for (StoreFile storefile : storeFiles) {
StoreFile.Reader reader = storefile.getReader();
StoreFileReader reader = storefile.getReader();
reader.loadFileInfo();
reader.loadBloomfilter();
assertEquals(num_unique_rows * duplicate_multiplier, reader.getEntries());
@ -4226,7 +4226,7 @@ public class TestHRegion {
// after compaction
storeFiles = store.getStorefiles();
for (StoreFile storefile : storeFiles) {
StoreFile.Reader reader = storefile.getReader();
StoreFileReader reader = storefile.getReader();
reader.loadFileInfo();
reader.loadBloomfilter();
assertEquals(num_unique_rows * duplicate_multiplier * num_storefiles, reader.getEntries());

View File

@ -97,7 +97,7 @@ public class TestReversibleScanners {
hcBuilder.withBlockSize(2 * 1024);
hcBuilder.withDataBlockEncoding(encoding);
HFileContext hFileContext = hcBuilder.build();
StoreFile.Writer writer = new StoreFile.WriterBuilder(
StoreFileWriter writer = new StoreFileWriter.Builder(
TEST_UTIL.getConfiguration(), cacheConf, fs).withOutputDir(hfilePath)
.withFileContext(hFileContext).build();
writeStoreFile(writer);
@ -145,15 +145,15 @@ public class TestReversibleScanners {
HFileContextBuilder hcBuilder = new HFileContextBuilder();
hcBuilder.withBlockSize(2 * 1024);
HFileContext hFileContext = hcBuilder.build();
StoreFile.Writer writer1 = new StoreFile.WriterBuilder(
StoreFileWriter writer1 = new StoreFileWriter.Builder(
TEST_UTIL.getConfiguration(), cacheConf, fs).withOutputDir(
hfilePath).withFileContext(hFileContext).build();
StoreFile.Writer writer2 = new StoreFile.WriterBuilder(
StoreFileWriter writer2 = new StoreFileWriter.Builder(
TEST_UTIL.getConfiguration(), cacheConf, fs).withOutputDir(
hfilePath).withFileContext(hFileContext).build();
MemStore memstore = new DefaultMemStore();
writeMemstoreAndStoreFiles(memstore, new StoreFile.Writer[] { writer1,
writeMemstoreAndStoreFiles(memstore, new StoreFileWriter[] { writer1,
writer2 });
StoreFile sf1 = new StoreFile(fs, writer1.getPath(),
@ -235,15 +235,15 @@ public class TestReversibleScanners {
HFileContextBuilder hcBuilder = new HFileContextBuilder();
hcBuilder.withBlockSize(2 * 1024);
HFileContext hFileContext = hcBuilder.build();
StoreFile.Writer writer1 = new StoreFile.WriterBuilder(
StoreFileWriter writer1 = new StoreFileWriter.Builder(
TEST_UTIL.getConfiguration(), cacheConf, fs).withOutputDir(
hfilePath).withFileContext(hFileContext).build();
StoreFile.Writer writer2 = new StoreFile.WriterBuilder(
StoreFileWriter writer2 = new StoreFileWriter.Builder(
TEST_UTIL.getConfiguration(), cacheConf, fs).withOutputDir(
hfilePath).withFileContext(hFileContext).build();
MemStore memstore = new DefaultMemStore();
writeMemstoreAndStoreFiles(memstore, new StoreFile.Writer[] { writer1,
writeMemstoreAndStoreFiles(memstore, new StoreFileWriter[] { writer1,
writer2 });
StoreFile sf1 = new StoreFile(fs, writer1.getPath(),
@ -633,7 +633,7 @@ public class TestReversibleScanners {
}
private static void writeMemstoreAndStoreFiles(MemStore memstore,
final StoreFile.Writer[] writers) throws IOException {
final StoreFileWriter[] writers) throws IOException {
try {
for (int i = 0; i < ROWSIZE; i++) {
for (int j = 0; j < QUALSIZE; j++) {
@ -651,7 +651,7 @@ public class TestReversibleScanners {
}
}
private static void writeStoreFile(final StoreFile.Writer writer)
private static void writeStoreFile(final StoreFileWriter writer)
throws IOException {
try {
for (int i = 0; i < ROWSIZE; i++) {

View File

@ -147,7 +147,7 @@ public class TestScanWithBloomError {
}
});
StoreFile.Reader lastStoreFileReader = null;
StoreFileReader lastStoreFileReader = null;
for (StoreFileScanner sfScanner : scanners)
lastStoreFileReader = sfScanner.getReader();

View File

@ -267,7 +267,7 @@ public class TestStore {
init(name.getMethodName(), conf, hcd);
// Test createWriterInTmp()
StoreFile.Writer writer = store.createWriterInTmp(4, hcd.getCompressionType(), false, true, false);
StoreFileWriter writer = store.createWriterInTmp(4, hcd.getCompressionType(), false, true, false);
Path path = writer.getPath();
writer.append(new KeyValue(row, family, qf1, Bytes.toBytes(1)));
writer.append(new KeyValue(row, family, qf2, Bytes.toBytes(2)));
@ -434,7 +434,7 @@ public class TestStore {
Configuration c = HBaseConfiguration.create();
FileSystem fs = FileSystem.get(c);
HFileContext meta = new HFileContextBuilder().withBlockSize(BLOCKSIZE_SMALL).build();
StoreFile.Writer w = new StoreFile.WriterBuilder(c, new CacheConfig(c),
StoreFileWriter w = new StoreFileWriter.Builder(c, new CacheConfig(c),
fs)
.withOutputDir(storedir)
.withFileContext(meta)
@ -1009,7 +1009,7 @@ public class TestStore {
Configuration c = TEST_UTIL.getConfiguration();
FileSystem fs = FileSystem.get(c);
HFileContext fileContext = new HFileContextBuilder().withBlockSize(BLOCKSIZE_SMALL).build();
StoreFile.Writer w = new StoreFile.WriterBuilder(c, new CacheConfig(c),
StoreFileWriter w = new StoreFileWriter.Builder(c, new CacheConfig(c),
fs)
.withOutputDir(storedir)
.withFileContext(fileContext)

View File

@ -109,7 +109,7 @@ public class TestStoreFile extends HBaseTestCase {
conf, fs, new Path(testDir, hri.getTable().getNameAsString()), hri);
HFileContext meta = new HFileContextBuilder().withBlockSize(2*1024).build();
StoreFile.Writer writer = new StoreFile.WriterBuilder(conf, cacheConf, this.fs)
StoreFileWriter writer = new StoreFileWriter.Builder(conf, cacheConf, this.fs)
.withFilePath(regionFs.createTempName())
.withFileContext(meta)
.build();
@ -121,7 +121,7 @@ public class TestStoreFile extends HBaseTestCase {
checkHalfHFile(regionFs, sf);
}
private void writeStoreFile(final StoreFile.Writer writer) throws IOException {
private void writeStoreFile(final StoreFileWriter writer) throws IOException {
writeStoreFile(writer, Bytes.toBytes(getName()), Bytes.toBytes(getName()));
}
@ -134,7 +134,7 @@ public class TestStoreFile extends HBaseTestCase {
* @param writer
* @throws IOException
*/
public static void writeStoreFile(final StoreFile.Writer writer, byte[] fam, byte[] qualifier)
public static void writeStoreFile(final StoreFileWriter writer, byte[] fam, byte[] qualifier)
throws IOException {
long now = System.currentTimeMillis();
try {
@ -162,7 +162,7 @@ public class TestStoreFile extends HBaseTestCase {
HFileContext meta = new HFileContextBuilder().withBlockSize(8 * 1024).build();
// Make a store file and write data to it.
StoreFile.Writer writer = new StoreFile.WriterBuilder(conf, cacheConf, this.fs)
StoreFileWriter writer = new StoreFileWriter.Builder(conf, cacheConf, this.fs)
.withFilePath(regionFs.createTempName())
.withFileContext(meta)
.build();
@ -171,7 +171,7 @@ public class TestStoreFile extends HBaseTestCase {
Path hsfPath = regionFs.commitStoreFile(TEST_FAMILY, writer.getPath());
StoreFile hsf = new StoreFile(this.fs, hsfPath, conf, cacheConf,
BloomType.NONE);
StoreFile.Reader reader = hsf.createReader();
StoreFileReader reader = hsf.createReader();
// Split on a row, not in middle of row. Midkey returned by reader
// may be in middle of row. Create new one with empty column and
// timestamp.
@ -204,7 +204,7 @@ public class TestStoreFile extends HBaseTestCase {
@Test
public void testEmptyStoreFileRestrictKeyRanges() throws Exception {
StoreFile.Reader reader = mock(StoreFile.Reader.class);
StoreFileReader reader = mock(StoreFileReader.class);
Store store = mock(Store.class);
HColumnDescriptor hcd = mock(HColumnDescriptor.class);
byte[] cf = Bytes.toBytes("ty");
@ -228,7 +228,7 @@ public class TestStoreFile extends HBaseTestCase {
HFileContext meta = new HFileContextBuilder().withBlockSize(8 * 1024).build();
// Make a store file and write data to it.
StoreFile.Writer writer = new StoreFile.WriterBuilder(conf, cacheConf, this.fs)
StoreFileWriter writer = new StoreFileWriter.Builder(conf, cacheConf, this.fs)
.withFilePath(regionFs.createTempName())
.withFileContext(meta)
.build();
@ -273,7 +273,7 @@ public class TestStoreFile extends HBaseTestCase {
HFileContext meta = new HFileContextBuilder().withBlockSize(8 * 1024).build();
// Make a store file and write data to it. <root>/<tablename>/<rgn>/<cf>/<file>
StoreFile.Writer writer = new StoreFile.WriterBuilder(testConf, cacheConf, this.fs)
StoreFileWriter writer = new StoreFileWriter.Builder(testConf, cacheConf, this.fs)
.withFilePath(regionFs.createTempName())
.withFileContext(meta)
.build();
@ -351,9 +351,9 @@ public class TestStoreFile extends HBaseTestCase {
midRow, null);
Path bottomPath = splitStoreFile(regionFs, bottomHri, TEST_FAMILY, f, midRow, false);
// Make readers on top and bottom.
StoreFile.Reader top = new StoreFile(
StoreFileReader top = new StoreFile(
this.fs, topPath, conf, cacheConf, BloomType.NONE).createReader();
StoreFile.Reader bottom = new StoreFile(
StoreFileReader bottom = new StoreFile(
this.fs, bottomPath, conf, cacheConf, BloomType.NONE).createReader();
ByteBuffer previous = null;
LOG.info("Midkey: " + midKV.toString());
@ -485,7 +485,7 @@ public class TestStoreFile extends HBaseTestCase {
private static final String localFormatter = "%010d";
private void bloomWriteRead(StoreFile.Writer writer, FileSystem fs) throws Exception {
private void bloomWriteRead(StoreFileWriter writer, FileSystem fs) throws Exception {
float err = conf.getFloat(BloomFilterFactory.IO_STOREFILE_BLOOM_ERROR_RATE, 0);
Path f = writer.getPath();
long now = System.currentTimeMillis();
@ -497,7 +497,7 @@ public class TestStoreFile extends HBaseTestCase {
}
writer.close();
StoreFile.Reader reader = new StoreFile.Reader(fs, f, cacheConf, conf);
StoreFileReader reader = new StoreFileReader(fs, f, cacheConf, conf);
reader.loadFileInfo();
reader.loadBloomfilter();
StoreFileScanner scanner = reader.getStoreFileScanner(false, false);
@ -545,7 +545,7 @@ public class TestStoreFile extends HBaseTestCase {
.withChecksumType(CKTYPE)
.withBytesPerCheckSum(CKBYTES).build();
// Make a store file and write data to it.
StoreFile.Writer writer = new StoreFile.WriterBuilder(conf, cacheConf, this.fs)
StoreFileWriter writer = new StoreFileWriter.Builder(conf, cacheConf, this.fs)
.withFilePath(f)
.withBloomType(BloomType.ROW)
.withMaxKeyCount(2000)
@ -569,7 +569,7 @@ public class TestStoreFile extends HBaseTestCase {
.withChecksumType(CKTYPE)
.withBytesPerCheckSum(CKBYTES).build();
// Make a store file and write data to it.
StoreFile.Writer writer = new StoreFile.WriterBuilder(conf, cacheConf, this.fs)
StoreFileWriter writer = new StoreFileWriter.Builder(conf, cacheConf, this.fs)
.withFilePath(f)
.withMaxKeyCount(2000)
.withFileContext(meta)
@ -585,7 +585,7 @@ public class TestStoreFile extends HBaseTestCase {
}
writer.close();
StoreFile.Reader reader = new StoreFile.Reader(fs, f, cacheConf, conf);
StoreFileReader reader = new StoreFileReader(fs, f, cacheConf, conf);
reader.loadFileInfo();
reader.loadBloomfilter();
@ -623,7 +623,7 @@ public class TestStoreFile extends HBaseTestCase {
Path f = new Path(ROOT_DIR, getName());
HFileContext meta = new HFileContextBuilder().withBlockSize(8 * 1024).build();
// Make a store file and write data to it.
StoreFile.Writer writer = new StoreFile.WriterBuilder(conf, cacheConf, this.fs)
StoreFileWriter writer = new StoreFileWriter.Builder(conf, cacheConf, this.fs)
.withFilePath(f)
.withFileContext(meta)
.build();
@ -631,7 +631,7 @@ public class TestStoreFile extends HBaseTestCase {
writeStoreFile(writer);
writer.close();
StoreFile.Reader reader = new StoreFile.Reader(fs, f, cacheConf, conf);
StoreFileReader reader = new StoreFileReader(fs, f, cacheConf, conf);
// Now do reseek with empty KV to position to the beginning of the file
@ -669,7 +669,7 @@ public class TestStoreFile extends HBaseTestCase {
.withChecksumType(CKTYPE)
.withBytesPerCheckSum(CKBYTES).build();
// Make a store file and write data to it.
StoreFile.Writer writer = new StoreFile.WriterBuilder(conf, cacheConf, this.fs)
StoreFileWriter writer = new StoreFileWriter.Builder(conf, cacheConf, this.fs)
.withFilePath(f)
.withBloomType(bt[x])
.withMaxKeyCount(expKeys[x])
@ -691,7 +691,7 @@ public class TestStoreFile extends HBaseTestCase {
}
writer.close();
StoreFile.Reader reader = new StoreFile.Reader(fs, f, cacheConf, conf);
StoreFileReader reader = new StoreFileReader(fs, f, cacheConf, conf);
reader.loadFileInfo();
reader.loadBloomfilter();
StoreFileScanner scanner = reader.getStoreFileScanner(false, false);
@ -768,7 +768,7 @@ public class TestStoreFile extends HBaseTestCase {
long seqId,
String path) {
StoreFile mock = Mockito.mock(StoreFile.class);
StoreFile.Reader reader = Mockito.mock(StoreFile.Reader.class);
StoreFileReader reader = Mockito.mock(StoreFileReader.class);
Mockito.doReturn(size).when(reader).length();
@ -825,7 +825,7 @@ public class TestStoreFile extends HBaseTestCase {
Path dir = new Path(storedir, "1234567890");
HFileContext meta = new HFileContextBuilder().withBlockSize(8 * 1024).build();
// Make a store file and write data to it.
StoreFile.Writer writer = new StoreFile.WriterBuilder(conf, cacheConf, this.fs)
StoreFileWriter writer = new StoreFileWriter.Builder(conf, cacheConf, this.fs)
.withOutputDir(dir)
.withFileContext(meta)
.build();
@ -845,7 +845,7 @@ public class TestStoreFile extends HBaseTestCase {
HColumnDescriptor hcd = mock(HColumnDescriptor.class);
when(hcd.getName()).thenReturn(family);
when(store.getFamily()).thenReturn(hcd);
StoreFile.Reader reader = hsf.createReader();
StoreFileReader reader = hsf.createReader();
StoreFileScanner scanner = reader.getStoreFileScanner(false, false);
TreeSet<byte[]> columns = new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR);
columns.add(qualifier);
@ -895,13 +895,13 @@ public class TestStoreFile extends HBaseTestCase {
conf.setBoolean(CacheConfig.CACHE_BLOCKS_ON_WRITE_KEY, false);
CacheConfig cacheConf = new CacheConfig(conf);
Path pathCowOff = new Path(baseDir, "123456789");
StoreFile.Writer writer = writeStoreFile(conf, cacheConf, pathCowOff, 3);
StoreFileWriter writer = writeStoreFile(conf, cacheConf, pathCowOff, 3);
StoreFile hsf = new StoreFile(this.fs, writer.getPath(), conf, cacheConf,
BloomType.NONE);
LOG.debug(hsf.getPath().toString());
// Read this file, we should see 3 misses
StoreFile.Reader reader = hsf.createReader();
StoreFileReader reader = hsf.createReader();
reader.loadFileInfo();
StoreFileScanner scanner = reader.getStoreFileScanner(true, true);
scanner.seek(KeyValue.LOWESTKEY);
@ -936,13 +936,13 @@ public class TestStoreFile extends HBaseTestCase {
// Let's read back the two files to ensure the blocks exactly match
hsf = new StoreFile(this.fs, pathCowOff, conf, cacheConf,
BloomType.NONE);
StoreFile.Reader readerOne = hsf.createReader();
StoreFileReader readerOne = hsf.createReader();
readerOne.loadFileInfo();
StoreFileScanner scannerOne = readerOne.getStoreFileScanner(true, true);
scannerOne.seek(KeyValue.LOWESTKEY);
hsf = new StoreFile(this.fs, pathCowOn, conf, cacheConf,
BloomType.NONE);
StoreFile.Reader readerTwo = hsf.createReader();
StoreFileReader readerTwo = hsf.createReader();
readerTwo.loadFileInfo();
StoreFileScanner scannerTwo = readerTwo.getStoreFileScanner(true, true);
scannerTwo.seek(KeyValue.LOWESTKEY);
@ -1010,7 +1010,7 @@ public class TestStoreFile extends HBaseTestCase {
return new Path(new Path(regionDir, family), path.getName());
}
private StoreFile.Writer writeStoreFile(Configuration conf,
private StoreFileWriter writeStoreFile(Configuration conf,
CacheConfig cacheConf, Path path, int numBlocks)
throws IOException {
// Let's put ~5 small KVs in each block, so let's make 5*numBlocks KVs
@ -1030,7 +1030,7 @@ public class TestStoreFile extends HBaseTestCase {
.withBytesPerCheckSum(CKBYTES)
.build();
// Make a store file and write data to it.
StoreFile.Writer writer = new StoreFile.WriterBuilder(conf, cacheConf, this.fs)
StoreFileWriter writer = new StoreFileWriter.Builder(conf, cacheConf, this.fs)
.withFilePath(path)
.withMaxKeyCount(2000)
.withFileContext(meta)
@ -1067,7 +1067,7 @@ public class TestStoreFile extends HBaseTestCase {
.withDataBlockEncoding(dataBlockEncoderAlgo)
.build();
// Make a store file and write data to it.
StoreFile.Writer writer = new StoreFile.WriterBuilder(conf, cacheConf, this.fs)
StoreFileWriter writer = new StoreFileWriter.Builder(conf, cacheConf, this.fs)
.withFilePath(path)
.withMaxKeyCount(2000)
.withFileContext(meta)
@ -1076,7 +1076,7 @@ public class TestStoreFile extends HBaseTestCase {
StoreFile storeFile = new StoreFile(fs, writer.getPath(), conf,
cacheConf, BloomType.NONE);
StoreFile.Reader reader = storeFile.createReader();
StoreFileReader reader = storeFile.createReader();
Map<byte[], byte[]> fileInfo = reader.loadFileInfo();
byte[] value = fileInfo.get(HFileDataBlockEncoder.DATA_BLOCK_ENCODING);

View File

@ -68,13 +68,13 @@ public class TestStoreFileScannerWithTagCompression {
HFileContext meta = new HFileContextBuilder().withBlockSize(8 * 1024).withIncludesTags(true)
.withCompressTags(true).withDataBlockEncoding(DataBlockEncoding.PREFIX).build();
// Make a store file and write data to it.
StoreFile.Writer writer = new StoreFile.WriterBuilder(conf, cacheConf, fs).withFilePath(f)
StoreFileWriter writer = new StoreFileWriter.Builder(conf, cacheConf, fs).withFilePath(f)
.withFileContext(meta).build();
writeStoreFile(writer);
writer.close();
StoreFile.Reader reader = new StoreFile.Reader(fs, f, cacheConf, conf);
StoreFileReader reader = new StoreFileReader(fs, f, cacheConf, conf);
StoreFileScanner s = reader.getStoreFileScanner(false, false);
try {
// Now do reseek with empty KV to position to the beginning of the file
@ -94,7 +94,7 @@ public class TestStoreFileScannerWithTagCompression {
}
}
private void writeStoreFile(final StoreFile.Writer writer) throws IOException {
private void writeStoreFile(final StoreFileWriter writer) throws IOException {
byte[] fam = Bytes.toBytes("f");
byte[] qualifier = Bytes.toBytes("q");
long now = System.currentTimeMillis();

View File

@ -106,7 +106,7 @@ public class TestStripeStoreEngine {
StoreFile sf = mock(StoreFile.class);
when(sf.getMetadataValue(any(byte[].class)))
.thenReturn(StripeStoreFileManager.INVALID_KEY);
when(sf.getReader()).thenReturn(mock(StoreFile.Reader.class));
when(sf.getReader()).thenReturn(mock(StoreFileReader.class));
when(sf.getPath()).thenReturn(new Path("moo"));
return sf;
}

View File

@ -26,6 +26,7 @@ import com.google.common.base.Objects;
import org.apache.commons.lang.RandomStringUtils;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.regionserver.StoreFileReader;
import org.apache.hadoop.util.StringUtils;
import static org.mockito.Mockito.mock;
@ -62,7 +63,7 @@ class MockStoreFileGenerator {
protected StoreFile createMockStoreFile(final long sizeInBytes, final long seqId) {
StoreFile mockSf = mock(StoreFile.class);
StoreFile.Reader reader = mock(StoreFile.Reader.class);
StoreFileReader reader = mock(StoreFileReader.class);
String stringPath = "/hbase/testTable/regionA/"
+ RandomStringUtils.random(FILENAME_LENGTH, 0, 0, true, true, null, random);
Path path = new Path(stringPath);

View File

@ -46,7 +46,9 @@ import org.apache.hadoop.hbase.regionserver.BloomType;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.regionserver.ScannerContext;
import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.regionserver.StoreFileReader;
import org.apache.hadoop.hbase.regionserver.StoreFileScanner;
import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
import org.apache.hadoop.hbase.regionserver.StripeMultiFileWriter;
import org.apache.hadoop.hbase.util.Bytes;
import org.mockito.invocation.InvocationOnMock;
@ -58,7 +60,7 @@ public class TestCompactor {
// "Files" are totally unused, it's Scanner class below that gives compactor fake KVs.
// But compaction depends on everything under the sun, so stub everything with dummies.
StoreFile sf = mock(StoreFile.class);
StoreFile.Reader r = mock(StoreFile.Reader.class);
StoreFileReader r = mock(StoreFileReader.class);
when(r.length()).thenReturn(1L);
when(r.getBloomFilterType()).thenReturn(BloomType.NONE);
when(r.getHFileReader()).thenReturn(mock(HFile.Reader.class));
@ -78,7 +80,7 @@ public class TestCompactor {
// StoreFile.Writer has private ctor and is unwieldy, so this has to be convoluted.
public static class StoreFileWritersCapture
implements Answer<StoreFile.Writer>, StripeMultiFileWriter.WriterFactory {
implements Answer<StoreFileWriter>, StripeMultiFileWriter.WriterFactory {
public static class Writer {
public ArrayList<KeyValue> kvs = new ArrayList<KeyValue>();
public TreeMap<byte[], byte[]> data = new TreeMap<byte[], byte[]>(Bytes.BYTES_COMPARATOR);
@ -88,10 +90,10 @@ public class TestCompactor {
private List<Writer> writers = new ArrayList<Writer>();
@Override
public StoreFile.Writer createWriter() throws IOException {
public StoreFileWriter createWriter() throws IOException {
final Writer realWriter = new Writer();
writers.add(realWriter);
StoreFile.Writer writer = mock(StoreFile.Writer.class);
StoreFileWriter writer = mock(StoreFileWriter.class);
doAnswer(new Answer<Object>() {
public Object answer(InvocationOnMock invocation) {
return realWriter.kvs.add((KeyValue) invocation.getArguments()[0]);
@ -120,7 +122,7 @@ public class TestCompactor {
}
@Override
public StoreFile.Writer answer(InvocationOnMock invocation) throws Throwable {
public StoreFileWriter answer(InvocationOnMock invocation) throws Throwable {
return createWriter();
}

View File

@ -60,6 +60,7 @@ import org.apache.hadoop.hbase.regionserver.ScannerContext;
import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.regionserver.StoreConfigInformation;
import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.regionserver.StoreFileReader;
import org.apache.hadoop.hbase.regionserver.StoreFileScanner;
import org.apache.hadoop.hbase.regionserver.StripeMultiFileWriter;
import org.apache.hadoop.hbase.regionserver.StripeStoreConfig;
@ -744,7 +745,7 @@ public class TestStripeCompactionPolicy {
private static StoreFile createFile(long size) throws Exception {
StoreFile sf = mock(StoreFile.class);
when(sf.getPath()).thenReturn(new Path("moo"));
StoreFile.Reader r = mock(StoreFile.Reader.class);
StoreFileReader r = mock(StoreFileReader.class);
when(r.getEntries()).thenReturn(size);
when(r.length()).thenReturn(size);
when(r.getBloomFilterType()).thenReturn(BloomType.NONE);

View File

@ -28,7 +28,7 @@ import org.apache.hadoop.hbase.io.compress.Compression
import org.apache.hadoop.hbase.io.compress.Compression.Algorithm
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding
import org.apache.hadoop.hbase.io.hfile.{CacheConfig, HFileContextBuilder, HFileWriterImpl}
import org.apache.hadoop.hbase.regionserver.{HStore, StoreFile, BloomType}
import org.apache.hadoop.hbase.regionserver.{HStore, StoreFile, StoreFileWriter, BloomType}
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.mapred.JobConf
import org.apache.spark.broadcast.Broadcast
@ -893,7 +893,7 @@ class HBaseContext(@transient sc: SparkContext,
//Add a '_' to the file name because this is a unfinished file. A rename will happen
// to remove the '_' when the file is closed.
new WriterLength(0,
new StoreFile.WriterBuilder(conf, new CacheConfig(tempConf), new HFileSystem(fs))
new StoreFileWriter.Builder(conf, new CacheConfig(tempConf), new HFileSystem(fs))
.withBloomType(BloomType.valueOf(familyOptions.bloomType))
.withComparator(CellComparator.COMPARATOR).withFileContext(hFileContext)
.withFilePath(new Path(familydir, "_" + UUID.randomUUID.toString.replaceAll("-", "")))
@ -1048,7 +1048,7 @@ class HBaseContext(@transient sc: SparkContext,
* @param compactionExclude The exclude compaction metadata flag for the HFile
*/
private def closeHFileWriter(fs:FileSystem,
w: StoreFile.Writer,
w: StoreFileWriter,
regionSplitPartitioner: BulkLoadPartitioner,
previousRow: Array[Byte],
compactionExclude: Boolean): Unit = {
@ -1079,13 +1079,13 @@ class HBaseContext(@transient sc: SparkContext,
}
/**
* This is a wrapper class around StoreFile.Writer. The reason for the
* This is a wrapper class around StoreFileWriter. The reason for the
* wrapper is to keep the length of the file along side the writer
*
* @param written The writer to be wrapped
* @param writer The number of bytes written to the writer
*/
class WriterLength(var written:Long, val writer:StoreFile.Writer)
class WriterLength(var written:Long, val writer:StoreFileWriter)
}
object LatestHBaseContextCache {