Backport "HBASE-25249 Adding StoreContext" to branch-2 (#2869)
Co-authored-by: Abhishek Khanna <akkhanna@amazon.com> Signed-off-by: Zach York <zyork@apache.org>
This commit is contained in:
parent
f429cfac9a
commit
83a6ca54bd
|
@ -70,6 +70,7 @@ import org.apache.hadoop.hbase.io.hfile.HFileWriterImpl;
|
|||
import org.apache.hadoop.hbase.regionserver.BloomType;
|
||||
import org.apache.hadoop.hbase.regionserver.HStore;
|
||||
import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
|
||||
import org.apache.hadoop.hbase.regionserver.StoreUtils;
|
||||
import org.apache.hadoop.hbase.util.BloomFilterUtil;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.CommonFSUtils;
|
||||
|
@ -386,16 +387,15 @@ public class HFileOutputFormat2
|
|||
DataBlockEncoding encoding = overriddenEncoding;
|
||||
encoding = encoding == null ? datablockEncodingMap.get(tableAndFamily) : encoding;
|
||||
encoding = encoding == null ? DataBlockEncoding.NONE : encoding;
|
||||
HFileContextBuilder contextBuilder = new HFileContextBuilder()
|
||||
.withCompression(compression).withChecksumType(HStore.getChecksumType(conf))
|
||||
.withBytesPerCheckSum(HStore.getBytesPerChecksum(conf)).withBlockSize(blockSize)
|
||||
HFileContextBuilder contextBuilder = new HFileContextBuilder().withCompression(compression)
|
||||
.withDataBlockEncoding(encoding).withChecksumType(StoreUtils.getChecksumType(conf))
|
||||
.withBytesPerCheckSum(StoreUtils.getBytesPerChecksum(conf)).withBlockSize(blockSize)
|
||||
.withColumnFamily(family).withTableName(tableName);
|
||||
|
||||
if (HFile.getFormatVersion(conf) >= HFile.MIN_FORMAT_VERSION_WITH_TAGS) {
|
||||
contextBuilder.withIncludesTags(true);
|
||||
}
|
||||
|
||||
contextBuilder.withDataBlockEncoding(encoding);
|
||||
HFileContext hFileContext = contextBuilder.build();
|
||||
if (null == favoredNodes) {
|
||||
wl.writer = new StoreFileWriter.Builder(conf, CacheConfig.DISABLED, fs)
|
||||
|
|
|
@ -64,9 +64,9 @@ import org.apache.hadoop.hbase.mob.compactions.MobCompactor;
|
|||
import org.apache.hadoop.hbase.mob.compactions.PartitionedMobCompactionRequest.CompactionPartitionId;
|
||||
import org.apache.hadoop.hbase.mob.compactions.PartitionedMobCompactor;
|
||||
import org.apache.hadoop.hbase.regionserver.BloomType;
|
||||
import org.apache.hadoop.hbase.regionserver.HStore;
|
||||
import org.apache.hadoop.hbase.regionserver.HStoreFile;
|
||||
import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
|
||||
import org.apache.hadoop.hbase.regionserver.StoreUtils;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.ChecksumType;
|
||||
import org.apache.hadoop.hbase.util.CommonFSUtils;
|
||||
|
@ -581,7 +581,7 @@ public final class MobUtils {
|
|||
return createWriter(conf, fs, family,
|
||||
new Path(basePath, UUID.randomUUID().toString().replaceAll("-", "")), maxKeyCount,
|
||||
family.getCompactionCompressionType(), cacheConfig, cryptoContext,
|
||||
HStore.getChecksumType(conf), HStore.getBytesPerChecksum(conf), family.getBlocksize(),
|
||||
StoreUtils.getChecksumType(conf), StoreUtils.getBytesPerChecksum(conf), family.getBlocksize(),
|
||||
family.getBloomFilterType(), isCompaction);
|
||||
}
|
||||
|
||||
|
@ -658,7 +658,7 @@ public final class MobUtils {
|
|||
throws IOException {
|
||||
return createWriter(conf, fs, family,
|
||||
new Path(basePath, mobFileName.getFileName()), maxKeyCount, compression, cacheConfig,
|
||||
cryptoContext, HStore.getChecksumType(conf), HStore.getBytesPerChecksum(conf),
|
||||
cryptoContext, StoreUtils.getChecksumType(conf), StoreUtils.getBytesPerChecksum(conf),
|
||||
family.getBlocksize(), BloomType.NONE, isCompaction);
|
||||
}
|
||||
|
||||
|
|
|
@ -93,7 +93,6 @@ public class HMobStore extends HStore {
|
|||
private AtomicLong mobFlushedCellsSize = new AtomicLong();
|
||||
private AtomicLong mobScanCellsCount = new AtomicLong();
|
||||
private AtomicLong mobScanCellsSize = new AtomicLong();
|
||||
private ColumnFamilyDescriptor family;
|
||||
private Map<String, List<Path>> map = new ConcurrentHashMap<>();
|
||||
private final IdLock keyLock = new IdLock();
|
||||
// When we add a MOB reference cell to the HFile, we will add 2 tags along with it
|
||||
|
@ -107,11 +106,10 @@ public class HMobStore extends HStore {
|
|||
public HMobStore(final HRegion region, final ColumnFamilyDescriptor family,
|
||||
final Configuration confParam, boolean warmup) throws IOException {
|
||||
super(region, family, confParam, warmup);
|
||||
this.family = family;
|
||||
this.mobFileCache = region.getMobFileCache();
|
||||
this.homePath = MobUtils.getMobHome(conf);
|
||||
this.mobFamilyPath = MobUtils.getMobFamilyPath(conf, this.getTableName(),
|
||||
family.getNameAsString());
|
||||
family.getNameAsString());
|
||||
List<Path> locations = new ArrayList<>(2);
|
||||
locations.add(mobFamilyPath);
|
||||
TableName tn = region.getTableDescriptor().getTableName();
|
||||
|
@ -248,9 +246,11 @@ public class HMobStore extends HStore {
|
|||
public StoreFileWriter createWriterInTmp(MobFileName mobFileName, Path basePath,
|
||||
long maxKeyCount, Compression.Algorithm compression,
|
||||
boolean isCompaction) throws IOException {
|
||||
return MobUtils.createWriter(conf, region.getFilesystem(), family,
|
||||
new Path(basePath, mobFileName.getFileName()), maxKeyCount, compression, cacheConf,
|
||||
cryptoContext, checksumType, bytesPerChecksum, blocksize, BloomType.NONE, isCompaction);
|
||||
return MobUtils.createWriter(conf, getFileSystem(), getColumnFamilyDescriptor(),
|
||||
new Path(basePath, mobFileName.getFileName()), maxKeyCount, compression, getCacheConfig(),
|
||||
getStoreContext().getEncryptionContext(), StoreUtils.getChecksumType(conf),
|
||||
StoreUtils.getBytesPerChecksum(conf), getStoreContext().getBlockSize(), BloomType.NONE,
|
||||
isCompaction);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -268,10 +268,10 @@ public class HMobStore extends HStore {
|
|||
String msg = "Renaming flushed file from " + sourceFile + " to " + dstPath;
|
||||
LOG.info(msg);
|
||||
Path parent = dstPath.getParent();
|
||||
if (!region.getFilesystem().exists(parent)) {
|
||||
region.getFilesystem().mkdirs(parent);
|
||||
if (!getFileSystem().exists(parent)) {
|
||||
getFileSystem().mkdirs(parent);
|
||||
}
|
||||
if (!region.getFilesystem().rename(sourceFile, dstPath)) {
|
||||
if (!getFileSystem().rename(sourceFile, dstPath)) {
|
||||
throw new IOException("Failed rename of " + sourceFile + " to " + dstPath);
|
||||
}
|
||||
}
|
||||
|
@ -284,7 +284,7 @@ public class HMobStore extends HStore {
|
|||
private void validateMobFile(Path path) throws IOException {
|
||||
HStoreFile storeFile = null;
|
||||
try {
|
||||
storeFile = new HStoreFile(region.getFilesystem(), path, conf, this.cacheConf,
|
||||
storeFile = new HStoreFile(getFileSystem(), path, conf, getCacheConfig(),
|
||||
BloomType.NONE, isPrimaryReplicaStore());
|
||||
storeFile.initReader();
|
||||
} catch (IOException e) {
|
||||
|
@ -335,9 +335,9 @@ public class HMobStore extends HStore {
|
|||
if (locations == null) {
|
||||
locations = new ArrayList<>(2);
|
||||
TableName tn = TableName.valueOf(tableNameString);
|
||||
locations.add(MobUtils.getMobFamilyPath(conf, tn, family.getNameAsString()));
|
||||
locations.add(MobUtils.getMobFamilyPath(conf, tn, getColumnFamilyName()));
|
||||
locations.add(HFileArchiveUtil.getStoreArchivePath(conf, tn,
|
||||
MobUtils.getMobRegionInfo(tn).getEncodedName(), family.getNameAsString()));
|
||||
MobUtils.getMobRegionInfo(tn).getEncodedName(), getColumnFamilyName()));
|
||||
map.put(tableNameString, locations);
|
||||
}
|
||||
} finally {
|
||||
|
@ -390,7 +390,7 @@ public class HMobStore extends HStore {
|
|||
MobFile file = null;
|
||||
Path path = new Path(location, fileName);
|
||||
try {
|
||||
file = mobFileCache.openFile(fs, path, cacheConf);
|
||||
file = mobFileCache.openFile(fs, path, getCacheConfig());
|
||||
return readPt != -1 ? file.readCell(search, cacheMobBlocks, readPt)
|
||||
: file.readCell(search, cacheMobBlocks);
|
||||
} catch (IOException e) {
|
||||
|
|
|
@ -97,7 +97,6 @@ import org.apache.hadoop.hbase.regionserver.wal.WALUtil;
|
|||
import org.apache.hadoop.hbase.security.EncryptionUtil;
|
||||
import org.apache.hadoop.hbase.security.User;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.ChecksumType;
|
||||
import org.apache.hadoop.hbase.util.ClassSize;
|
||||
import org.apache.hadoop.hbase.util.CommonFSUtils;
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||
|
@ -157,11 +156,8 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
|
|||
|
||||
protected final MemStore memstore;
|
||||
// This stores directory in the filesystem.
|
||||
protected final HRegion region;
|
||||
private final ColumnFamilyDescriptor family;
|
||||
private final HRegionFileSystem fs;
|
||||
private final HRegion region;
|
||||
protected Configuration conf;
|
||||
protected CacheConfig cacheConf;
|
||||
private long lastCompactSize = 0;
|
||||
volatile boolean forceMajor = false;
|
||||
/* how many bytes to write between status checks */
|
||||
|
@ -217,16 +213,8 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
|
|||
private final Set<ChangedReadersObserver> changedReaderObservers =
|
||||
Collections.newSetFromMap(new ConcurrentHashMap<ChangedReadersObserver, Boolean>());
|
||||
|
||||
protected final int blocksize;
|
||||
private HFileDataBlockEncoder dataBlockEncoder;
|
||||
|
||||
/** Checksum configuration */
|
||||
protected ChecksumType checksumType;
|
||||
protected int bytesPerChecksum;
|
||||
|
||||
// Comparing KeyValues
|
||||
protected final CellComparator comparator;
|
||||
|
||||
final StoreEngine<?, ?, ?, ?> storeEngine;
|
||||
|
||||
private static final AtomicBoolean offPeakCompactionTracker = new AtomicBoolean();
|
||||
|
@ -238,7 +226,6 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
|
|||
|
||||
private long blockingFileCount;
|
||||
private int compactionCheckMultiplier;
|
||||
protected Encryption.Context cryptoContext = Encryption.Context.NONE;
|
||||
|
||||
private AtomicLong flushedCellsCount = new AtomicLong();
|
||||
private AtomicLong compactedCellsCount = new AtomicLong();
|
||||
|
@ -248,6 +235,8 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
|
|||
private AtomicLong compactedCellsSize = new AtomicLong();
|
||||
private AtomicLong majorCompactedCellsSize = new AtomicLong();
|
||||
|
||||
private final StoreContext storeContext;
|
||||
|
||||
/**
|
||||
* Constructor
|
||||
* @param family HColumnDescriptor for this column
|
||||
|
@ -256,12 +245,6 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
|
|||
protected HStore(final HRegion region, final ColumnFamilyDescriptor family,
|
||||
final Configuration confParam, boolean warmup) throws IOException {
|
||||
|
||||
this.fs = region.getRegionFileSystem();
|
||||
|
||||
// Assemble the store's home directory and Ensure it exists.
|
||||
fs.createStoreDir(family.getNameAsString());
|
||||
this.region = region;
|
||||
this.family = family;
|
||||
// 'conf' renamed to 'confParam' b/c we use this.conf in the constructor
|
||||
// CompoundConfiguration will look for keys in reverse order of addition, so we'd
|
||||
// add global config first, then table and cf overrides, then cf metadata.
|
||||
|
@ -270,18 +253,22 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
|
|||
.addBytesMap(region.getTableDescriptor().getValues())
|
||||
.addStringMap(family.getConfiguration())
|
||||
.addBytesMap(family.getValues());
|
||||
this.blocksize = family.getBlocksize();
|
||||
|
||||
this.region = region;
|
||||
this.storeContext = initializeStoreContext(family);
|
||||
|
||||
// Assemble the store's home directory and Ensure it exists.
|
||||
region.getRegionFileSystem().createStoreDir(family.getNameAsString());
|
||||
|
||||
// set block storage policy for store directory
|
||||
String policyName = family.getStoragePolicy();
|
||||
if (null == policyName) {
|
||||
policyName = this.conf.get(BLOCK_STORAGE_POLICY_KEY, DEFAULT_BLOCK_STORAGE_POLICY);
|
||||
}
|
||||
this.fs.setStoragePolicy(family.getNameAsString(), policyName.trim());
|
||||
region.getRegionFileSystem().setStoragePolicy(family.getNameAsString(), policyName.trim());
|
||||
|
||||
this.dataBlockEncoder = new HFileDataBlockEncoderImpl(family.getDataBlockEncoding());
|
||||
|
||||
this.comparator = region.getCellComparator();
|
||||
// used by ScanQueryMatcher
|
||||
long timeToPurgeDeletes =
|
||||
Math.max(conf.getLong("hbase.hstore.time.to.purge.deletes", 0), 0);
|
||||
|
@ -290,14 +277,11 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
|
|||
long ttl = determineTTLFromFamily(family);
|
||||
// Why not just pass a HColumnDescriptor in here altogether? Even if have
|
||||
// to clone it?
|
||||
scanInfo = new ScanInfo(conf, family, ttl, timeToPurgeDeletes, this.comparator);
|
||||
scanInfo = new ScanInfo(conf, family, ttl, timeToPurgeDeletes, region.getCellComparator());
|
||||
this.memstore = getMemstore();
|
||||
|
||||
this.offPeakHours = OffPeakHours.getInstance(conf);
|
||||
|
||||
// Setting up cache configuration for this family
|
||||
createCacheConf(family);
|
||||
|
||||
this.verifyBulkLoads = conf.getBoolean("hbase.hstore.bulkload.verify", false);
|
||||
|
||||
this.blockingFileCount =
|
||||
|
@ -315,7 +299,7 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
|
|||
"hbase.hstore.close.check.interval", 10*1000*1000 /* 10 MB */);
|
||||
}
|
||||
|
||||
this.storeEngine = createStoreEngine(this, this.conf, this.comparator);
|
||||
this.storeEngine = createStoreEngine(this, this.conf, region.getCellComparator());
|
||||
List<HStoreFile> hStoreFiles = loadStoreFiles(warmup);
|
||||
// Move the storeSize calculation out of loadStoreFiles() method, because the secondary read
|
||||
// replica's refreshStoreFiles() will also use loadStoreFiles() to refresh its store files and
|
||||
|
@ -325,10 +309,6 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
|
|||
this.totalUncompressedBytes.addAndGet(getTotalUncompressedBytes(hStoreFiles));
|
||||
this.storeEngine.getStoreFileManager().loadFiles(hStoreFiles);
|
||||
|
||||
// Initialize checksum type from name. The names are CRC32, CRC32C, etc.
|
||||
this.checksumType = getChecksumType(conf);
|
||||
// Initialize bytes per checksum
|
||||
this.bytesPerChecksum = getBytesPerChecksum(conf);
|
||||
flushRetriesNumber = conf.getInt(
|
||||
"hbase.hstore.flush.retries.number", DEFAULT_FLUSH_RETRIES_NUMBER);
|
||||
pauseTime = conf.getInt(HConstants.HBASE_SERVER_PAUSE, HConstants.DEFAULT_HBASE_SERVER_PAUSE);
|
||||
|
@ -337,7 +317,6 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
|
|||
"hbase.hstore.flush.retries.number must be > 0, not "
|
||||
+ flushRetriesNumber);
|
||||
}
|
||||
cryptoContext = EncryptionUtil.createEncryptionContext(conf, family);
|
||||
|
||||
int confPrintThreshold =
|
||||
this.conf.getInt("hbase.region.store.parallel.put.print.threshold", 50);
|
||||
|
@ -354,6 +333,32 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
|
|||
cacheOnWriteLogged = false;
|
||||
}
|
||||
|
||||
private StoreContext initializeStoreContext(ColumnFamilyDescriptor family) throws IOException {
|
||||
return new StoreContext.Builder()
|
||||
.withBlockSize(family.getBlocksize())
|
||||
.withEncryptionContext(EncryptionUtil.createEncryptionContext(conf, family))
|
||||
.withBloomType(family.getBloomFilterType())
|
||||
.withCacheConfig(createCacheConf(family))
|
||||
.withCellComparator(region.getCellComparator())
|
||||
.withColumnFamilyDescriptor(family)
|
||||
.withCompactedFilesSupplier(this::getCompactedFiles)
|
||||
.withRegionFileSystem(region.getRegionFileSystem())
|
||||
.withFavoredNodesSupplier(this::getFavoredNodes)
|
||||
.withFamilyStoreDirectoryPath(region.getRegionFileSystem()
|
||||
.getStoreDir(family.getNameAsString()))
|
||||
.withRegionCoprocessorHost(region.getCoprocessorHost())
|
||||
.build();
|
||||
}
|
||||
|
||||
private InetSocketAddress[] getFavoredNodes() {
|
||||
InetSocketAddress[] favoredNodes = null;
|
||||
if (region.getRegionServerServices() != null) {
|
||||
favoredNodes = region.getRegionServerServices().getFavoredNodesForRegion(
|
||||
region.getRegionInfo().getEncodedName());
|
||||
}
|
||||
return favoredNodes;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return MemStore Instance to use in this store.
|
||||
*/
|
||||
|
@ -365,7 +370,7 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
|
|||
inMemoryCompaction = MemoryCompactionPolicy
|
||||
.valueOf(conf.get("hbase.systemtables.compacting.memstore.type", "NONE").toUpperCase());
|
||||
} else {
|
||||
inMemoryCompaction = family.getInMemoryCompaction();
|
||||
inMemoryCompaction = getColumnFamilyDescriptor().getInMemoryCompaction();
|
||||
}
|
||||
if (inMemoryCompaction == null) {
|
||||
inMemoryCompaction =
|
||||
|
@ -375,13 +380,13 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
|
|||
switch (inMemoryCompaction) {
|
||||
case NONE:
|
||||
ms = ReflectionUtils.newInstance(DefaultMemStore.class,
|
||||
new Object[] { conf, this.comparator,
|
||||
new Object[] { conf, getComparator(),
|
||||
this.getHRegion().getRegionServicesForStores()});
|
||||
break;
|
||||
default:
|
||||
Class<? extends CompactingMemStore> clz = conf.getClass(MEMSTORE_CLASS_NAME,
|
||||
CompactingMemStore.class, CompactingMemStore.class);
|
||||
ms = ReflectionUtils.newInstance(clz, new Object[]{conf, this.comparator, this,
|
||||
ms = ReflectionUtils.newInstance(clz, new Object[]{conf, getComparator(), this,
|
||||
this.getHRegion().getRegionServicesForStores(), inMemoryCompaction});
|
||||
}
|
||||
return ms;
|
||||
|
@ -391,10 +396,12 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
|
|||
* Creates the cache config.
|
||||
* @param family The current column family.
|
||||
*/
|
||||
protected void createCacheConf(final ColumnFamilyDescriptor family) {
|
||||
this.cacheConf = new CacheConfig(conf, family, region.getBlockCache(),
|
||||
protected CacheConfig createCacheConf(final ColumnFamilyDescriptor family) {
|
||||
CacheConfig cacheConf = new CacheConfig(conf, family, region.getBlockCache(),
|
||||
region.getRegionServicesForStores().getByteBuffAllocator());
|
||||
LOG.info("Created cacheConfig: " + this.getCacheConfig() + " for " + this);
|
||||
LOG.info("Created cacheConfig: {}, for column family {} of region {} ", cacheConf,
|
||||
family.getNameAsString(), region.getRegionInfo().getEncodedName());
|
||||
return cacheConf;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -407,7 +414,7 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
|
|||
*/
|
||||
protected StoreEngine<?, ?, ?, ?> createStoreEngine(HStore store, Configuration conf,
|
||||
CellComparator kvComparator) throws IOException {
|
||||
return StoreEngine.create(store, conf, comparator);
|
||||
return StoreEngine.create(store, conf, kvComparator);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -428,9 +435,13 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
|
|||
return ttl;
|
||||
}
|
||||
|
||||
StoreContext getStoreContext() {
|
||||
return storeContext;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getColumnFamilyName() {
|
||||
return this.family.getNameAsString();
|
||||
return this.storeContext.getFamily().getNameAsString();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -440,11 +451,11 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
|
|||
|
||||
@Override
|
||||
public FileSystem getFileSystem() {
|
||||
return this.fs.getFileSystem();
|
||||
return storeContext.getRegionFileSystem().getFileSystem();
|
||||
}
|
||||
|
||||
public HRegionFileSystem getRegionFileSystem() {
|
||||
return this.fs;
|
||||
return storeContext.getRegionFileSystem();
|
||||
}
|
||||
|
||||
/* Implementation of StoreConfigInformation */
|
||||
|
@ -481,29 +492,6 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
|
|||
}
|
||||
/* End implementation of StoreConfigInformation */
|
||||
|
||||
/**
|
||||
* Returns the configured bytesPerChecksum value.
|
||||
* @param conf The configuration
|
||||
* @return The bytesPerChecksum that is set in the configuration
|
||||
*/
|
||||
public static int getBytesPerChecksum(Configuration conf) {
|
||||
return conf.getInt(HConstants.BYTES_PER_CHECKSUM,
|
||||
HFile.DEFAULT_BYTES_PER_CHECKSUM);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the configured checksum algorithm.
|
||||
* @param conf The configuration
|
||||
* @return The checksum algorithm that is set in the configuration
|
||||
*/
|
||||
public static ChecksumType getChecksumType(Configuration conf) {
|
||||
String checksumName = conf.get(HConstants.CHECKSUM_TYPE_NAME);
|
||||
if (checksumName == null) {
|
||||
return ChecksumType.getDefaultChecksumType();
|
||||
} else {
|
||||
return ChecksumType.nameToType(checksumName);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @return how many bytes to write between status checks
|
||||
|
@ -514,7 +502,7 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
|
|||
|
||||
@Override
|
||||
public ColumnFamilyDescriptor getColumnFamilyDescriptor() {
|
||||
return this.family;
|
||||
return this.storeContext.getFamily();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -573,7 +561,7 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
|
|||
* from the given directory.
|
||||
*/
|
||||
private List<HStoreFile> loadStoreFiles(boolean warmup) throws IOException {
|
||||
Collection<StoreFileInfo> files = fs.getStoreFiles(getColumnFamilyName());
|
||||
Collection<StoreFileInfo> files = getRegionFileSystem().getStoreFiles(getColumnFamilyName());
|
||||
return openStoreFiles(files, warmup);
|
||||
}
|
||||
|
||||
|
@ -624,7 +612,7 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
|
|||
if (ioe != null) {
|
||||
// close StoreFile readers
|
||||
boolean evictOnClose =
|
||||
cacheConf != null? cacheConf.shouldEvictOnClose(): true;
|
||||
getCacheConfig() != null? getCacheConfig().shouldEvictOnClose(): true;
|
||||
for (HStoreFile file : results) {
|
||||
try {
|
||||
if (file != null) {
|
||||
|
@ -652,7 +640,8 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
|
|||
results.removeAll(filesToRemove);
|
||||
if (!filesToRemove.isEmpty() && this.isPrimaryReplicaStore()) {
|
||||
LOG.debug("Moving the files {} to archive", filesToRemove);
|
||||
this.fs.removeStoreFiles(this.getColumnFamilyDescriptor().getNameAsString(), filesToRemove);
|
||||
getRegionFileSystem().removeStoreFiles(this.getColumnFamilyDescriptor().getNameAsString(),
|
||||
filesToRemove);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -661,7 +650,7 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
|
|||
|
||||
@Override
|
||||
public void refreshStoreFiles() throws IOException {
|
||||
Collection<StoreFileInfo> newFiles = fs.getStoreFiles(getColumnFamilyName());
|
||||
Collection<StoreFileInfo> newFiles = getRegionFileSystem().getStoreFiles(getColumnFamilyName());
|
||||
refreshStoreFilesInternal(newFiles);
|
||||
}
|
||||
|
||||
|
@ -672,7 +661,7 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
|
|||
public void refreshStoreFiles(Collection<String> newFiles) throws IOException {
|
||||
List<StoreFileInfo> storeFiles = new ArrayList<>(newFiles.size());
|
||||
for (String file : newFiles) {
|
||||
storeFiles.add(fs.getStoreFileInfo(getColumnFamilyName(), file));
|
||||
storeFiles.add(getRegionFileSystem().getStoreFileInfo(getColumnFamilyName(), file));
|
||||
}
|
||||
refreshStoreFilesInternal(storeFiles);
|
||||
}
|
||||
|
@ -749,7 +738,8 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
|
|||
|
||||
private HStoreFile createStoreFileAndReader(StoreFileInfo info) throws IOException {
|
||||
info.setRegionCoprocessorHost(this.region.getCoprocessorHost());
|
||||
HStoreFile storeFile = new HStoreFile(info, this.family.getBloomFilterType(), this.cacheConf);
|
||||
HStoreFile storeFile = new HStoreFile(info, getColumnFamilyDescriptor().getBloomFilterType(),
|
||||
getCacheConfig());
|
||||
storeFile.initReader();
|
||||
return storeFile;
|
||||
}
|
||||
|
@ -832,7 +822,7 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
|
|||
LOG.info("Validating hfile at " + srcPath + " for inclusion in " + this);
|
||||
FileSystem srcFs = srcPath.getFileSystem(conf);
|
||||
srcFs.access(srcPath, FsAction.READ_WRITE);
|
||||
reader = HFile.createReader(srcFs, srcPath, cacheConf, isPrimaryReplicaStore(), conf);
|
||||
reader = HFile.createReader(srcFs, srcPath, getCacheConfig(), isPrimaryReplicaStore(), conf);
|
||||
|
||||
Optional<byte[]> firstKey = reader.getFirstRowKey();
|
||||
Preconditions.checkState(firstKey.isPresent(), "First key can not be null");
|
||||
|
@ -869,7 +859,7 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
|
|||
do {
|
||||
Cell cell = scanner.getCell();
|
||||
if (prevCell != null) {
|
||||
if (comparator.compareRows(prevCell, cell) > 0) {
|
||||
if (getComparator().compareRows(prevCell, cell) > 0) {
|
||||
throw new InvalidHFileException("Previous row is greater than"
|
||||
+ " current row: path=" + srcPath + " previous="
|
||||
+ CellUtil.getCellKeyAsString(prevCell) + " current="
|
||||
|
@ -906,13 +896,13 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
|
|||
*/
|
||||
public Pair<Path, Path> preBulkLoadHFile(String srcPathStr, long seqNum) throws IOException {
|
||||
Path srcPath = new Path(srcPathStr);
|
||||
return fs.bulkLoadStoreFile(getColumnFamilyName(), srcPath, seqNum);
|
||||
return getRegionFileSystem().bulkLoadStoreFile(getColumnFamilyName(), srcPath, seqNum);
|
||||
}
|
||||
|
||||
public Path bulkLoadHFile(byte[] family, String srcPathStr, Path dstPath) throws IOException {
|
||||
Path srcPath = new Path(srcPathStr);
|
||||
try {
|
||||
fs.commitStoreFile(srcPath, dstPath);
|
||||
getRegionFileSystem().commitStoreFile(srcPath, dstPath);
|
||||
} finally {
|
||||
if (this.getCoprocessorHost() != null) {
|
||||
this.getCoprocessorHost().postCommitStoreFile(family, srcPath, dstPath);
|
||||
|
@ -978,8 +968,8 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
|
|||
storeEngine.getStoreFileManager().clearCompactedFiles();
|
||||
// clear the compacted files
|
||||
if (CollectionUtils.isNotEmpty(compactedfiles)) {
|
||||
removeCompactedfiles(compactedfiles, cacheConf != null ?
|
||||
cacheConf.shouldEvictOnClose() : true);
|
||||
removeCompactedfiles(compactedfiles, getCacheConfig() != null ?
|
||||
getCacheConfig().shouldEvictOnClose() : true);
|
||||
}
|
||||
if (!result.isEmpty()) {
|
||||
// initialize the thread pool for closing store files in parallel.
|
||||
|
@ -995,7 +985,7 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
|
|||
@Override
|
||||
public Void call() throws IOException {
|
||||
boolean evictOnClose =
|
||||
cacheConf != null? cacheConf.shouldEvictOnClose(): true;
|
||||
getCacheConfig() != null? getCacheConfig().shouldEvictOnClose(): true;
|
||||
f.closeStoreFile(evictOnClose);
|
||||
return null;
|
||||
}
|
||||
|
@ -1106,7 +1096,7 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
|
|||
FileSystem srcFs = path.getFileSystem(conf);
|
||||
srcFs.access(path, FsAction.READ_WRITE);
|
||||
try (HFile.Reader reader =
|
||||
HFile.createReader(srcFs, path, cacheConf, isPrimaryReplicaStore(), conf)) {
|
||||
HFile.createReader(srcFs, path, getCacheConfig(), isPrimaryReplicaStore(), conf)) {
|
||||
Optional<byte[]> firstKey = reader.getFirstRowKey();
|
||||
Preconditions.checkState(firstKey.isPresent(), "First key can not be null");
|
||||
Optional<Cell> lk = reader.getLastKey();
|
||||
|
@ -1118,7 +1108,7 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
|
|||
}
|
||||
}
|
||||
|
||||
Path dstPath = fs.commitStoreFile(getColumnFamilyName(), path);
|
||||
Path dstPath = getRegionFileSystem().commitStoreFile(getColumnFamilyName(), path);
|
||||
HStoreFile sf = createStoreFileAndReader(dstPath);
|
||||
StoreFileReader r = sf.getReader();
|
||||
this.storeSize.addAndGet(r.length());
|
||||
|
@ -1143,7 +1133,7 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
|
|||
private HStoreFile commitFile(Path path, long logCacheFlushId, MonitoredTask status)
|
||||
throws IOException {
|
||||
// Write-out finished successfully, move into the right spot
|
||||
Path dstPath = fs.commitStoreFile(getColumnFamilyName(), path);
|
||||
Path dstPath = getRegionFileSystem().commitStoreFile(getColumnFamilyName(), path);
|
||||
|
||||
status.setStatus("Flushing " + this + ": reopening flushed file");
|
||||
HStoreFile sf = createStoreFileAndReader(dstPath);
|
||||
|
@ -1181,12 +1171,13 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
|
|||
boolean shouldDropBehind, long totalCompactedFilesSize, String fileStoragePolicy)
|
||||
throws IOException {
|
||||
// creating new cache config for each new writer
|
||||
final CacheConfig cacheConf = getCacheConfig();
|
||||
final CacheConfig writerCacheConf = new CacheConfig(cacheConf);
|
||||
if (isCompaction) {
|
||||
// Don't cache data on write on compactions, unless specifically configured to do so
|
||||
// Cache only when total file size remains lower than configured threshold
|
||||
final boolean cacheCompactedBlocksOnWrite =
|
||||
cacheConf.shouldCacheCompactedBlocksOnWrite();
|
||||
getCacheConfig().shouldCacheCompactedBlocksOnWrite();
|
||||
// if data blocks are to be cached on write
|
||||
// during compaction, we should forcefully
|
||||
// cache index and bloom blocks as well
|
||||
|
@ -1220,53 +1211,48 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
|
|||
}
|
||||
}
|
||||
}
|
||||
InetSocketAddress[] favoredNodes = null;
|
||||
if (region.getRegionServerServices() != null) {
|
||||
favoredNodes = region.getRegionServerServices().getFavoredNodesForRegion(
|
||||
region.getRegionInfo().getEncodedName());
|
||||
}
|
||||
Encryption.Context encryptionContext = storeContext.getEncryptionContext();
|
||||
HFileContext hFileContext = createFileContext(compression, includeMVCCReadpoint, includesTag,
|
||||
cryptoContext);
|
||||
Path familyTempDir = new Path(fs.getTempDir(), family.getNameAsString());
|
||||
StoreFileWriter.Builder builder = new StoreFileWriter.Builder(conf, writerCacheConf,
|
||||
this.getFileSystem())
|
||||
.withOutputDir(familyTempDir)
|
||||
.withBloomType(family.getBloomFilterType())
|
||||
.withMaxKeyCount(maxKeyCount)
|
||||
.withFavoredNodes(favoredNodes)
|
||||
.withFileContext(hFileContext)
|
||||
.withShouldDropCacheBehind(shouldDropBehind)
|
||||
.withCompactedFilesSupplier(this::getCompactedFiles)
|
||||
.withFileStoragePolicy(fileStoragePolicy);
|
||||
encryptionContext);
|
||||
Path familyTempDir = new Path(getRegionFileSystem().getTempDir(), getColumnFamilyName());
|
||||
StoreFileWriter.Builder builder =
|
||||
new StoreFileWriter.Builder(conf, writerCacheConf, getFileSystem())
|
||||
.withOutputDir(familyTempDir)
|
||||
.withBloomType(storeContext.getBloomFilterType())
|
||||
.withMaxKeyCount(maxKeyCount)
|
||||
.withFavoredNodes(storeContext.getFavoredNodes())
|
||||
.withFileContext(hFileContext)
|
||||
.withShouldDropCacheBehind(shouldDropBehind)
|
||||
.withCompactedFilesSupplier(storeContext.getCompactedFilesSupplier())
|
||||
.withFileStoragePolicy(fileStoragePolicy);
|
||||
return builder.build();
|
||||
}
|
||||
|
||||
private HFileContext createFileContext(Compression.Algorithm compression,
|
||||
boolean includeMVCCReadpoint, boolean includesTag, Encryption.Context cryptoContext) {
|
||||
boolean includeMVCCReadpoint, boolean includesTag, Encryption.Context encryptionContext) {
|
||||
if (compression == null) {
|
||||
compression = HFile.DEFAULT_COMPRESSION_ALGORITHM;
|
||||
}
|
||||
ColumnFamilyDescriptor family = getColumnFamilyDescriptor();
|
||||
HFileContext hFileContext = new HFileContextBuilder()
|
||||
.withIncludesMvcc(includeMVCCReadpoint)
|
||||
.withIncludesTags(includesTag)
|
||||
.withCompression(compression)
|
||||
.withCompressTags(family.isCompressTags())
|
||||
.withChecksumType(checksumType)
|
||||
.withBytesPerCheckSum(bytesPerChecksum)
|
||||
.withBlockSize(blocksize)
|
||||
.withHBaseCheckSum(true)
|
||||
.withDataBlockEncoding(family.getDataBlockEncoding())
|
||||
.withEncryptionContext(cryptoContext)
|
||||
.withCreateTime(EnvironmentEdgeManager.currentTime())
|
||||
.withColumnFamily(family.getName())
|
||||
.withTableName(region.getTableDescriptor()
|
||||
.getTableName().getName())
|
||||
.withCellComparator(this.comparator)
|
||||
.build();
|
||||
.withIncludesMvcc(includeMVCCReadpoint)
|
||||
.withIncludesTags(includesTag)
|
||||
.withCompression(compression)
|
||||
.withCompressTags(family.isCompressTags())
|
||||
.withChecksumType(StoreUtils.getChecksumType(conf))
|
||||
.withBytesPerCheckSum(StoreUtils.getBytesPerChecksum(conf))
|
||||
.withBlockSize(family.getBlocksize())
|
||||
.withHBaseCheckSum(true)
|
||||
.withDataBlockEncoding(family.getDataBlockEncoding())
|
||||
.withEncryptionContext(encryptionContext)
|
||||
.withCreateTime(EnvironmentEdgeManager.currentTime())
|
||||
.withColumnFamily(getColumnFamilyDescriptor().getName())
|
||||
.withTableName(getTableName().getName())
|
||||
.withCellComparator(getComparator())
|
||||
.build();
|
||||
return hFileContext;
|
||||
}
|
||||
|
||||
|
||||
private long getTotalSize(Collection<HStoreFile> sfs) {
|
||||
return sfs.stream().mapToLong(sf -> sf.getReader().length()).sum();
|
||||
}
|
||||
|
@ -1543,7 +1529,7 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
|
|||
|
||||
// Ready to go. Have list of files to compact.
|
||||
LOG.info("Starting compaction of " + filesToCompact +
|
||||
" into tmpdir=" + fs.getTempDir() + ", totalSize=" +
|
||||
" into tmpdir=" + getRegionFileSystem().getTempDir() + ", totalSize=" +
|
||||
TraditionalBinaryPrefix.long2String(cr.getSize(), "", 1));
|
||||
|
||||
return doCompaction(cr, filesToCompact, user, compactionStartTime,
|
||||
|
@ -1593,7 +1579,7 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
|
|||
String prefix = HConstants.STORAGE_POLICY_PREFIX;
|
||||
for (Path newFile : newFiles) {
|
||||
if (newFile.getParent().getName().startsWith(prefix)) {
|
||||
CommonFSUtils.setStoragePolicy(fs.getFileSystem(), newFile,
|
||||
CommonFSUtils.setStoragePolicy(getRegionFileSystem().getFileSystem(), newFile,
|
||||
newFile.getParent().getName().substring(prefix.length()));
|
||||
}
|
||||
}
|
||||
|
@ -1618,7 +1604,7 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
|
|||
HStoreFile moveFileIntoPlace(Path newFile) throws IOException {
|
||||
validateStoreFile(newFile);
|
||||
// Move the file into the right spot
|
||||
Path destPath = fs.commitStoreFile(getColumnFamilyName(), newFile);
|
||||
Path destPath = getRegionFileSystem().commitStoreFile(getColumnFamilyName(), newFile);
|
||||
return createStoreFileAndReader(destPath);
|
||||
}
|
||||
|
||||
|
@ -1638,8 +1624,8 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
|
|||
newFiles.stream().map(HStoreFile::getPath).collect(Collectors.toList());
|
||||
RegionInfo info = this.region.getRegionInfo();
|
||||
CompactionDescriptor compactionDescriptor = ProtobufUtil.toCompactionDescriptor(info,
|
||||
family.getName(), inputPaths, outputPaths,
|
||||
fs.getStoreDir(getColumnFamilyDescriptor().getNameAsString()));
|
||||
getColumnFamilyDescriptor().getName(), inputPaths, outputPaths,
|
||||
getRegionFileSystem().getStoreDir(getColumnFamilyDescriptor().getNameAsString()));
|
||||
// Fix reaching into Region to get the maxWaitForSeqId.
|
||||
// Does this method belong in Region altogether given it is making so many references up there?
|
||||
// Could be Region#writeCompactionMarker(compactionDescriptor);
|
||||
|
@ -1766,7 +1752,7 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
|
|||
String familyName = this.getColumnFamilyName();
|
||||
Set<String> inputFiles = new HashSet<>();
|
||||
for (String compactionInput : compactionInputs) {
|
||||
Path inputPath = fs.getStoreFilePath(familyName, compactionInput);
|
||||
Path inputPath = getRegionFileSystem().getStoreFilePath(familyName, compactionInput);
|
||||
inputFiles.add(inputPath.getName());
|
||||
}
|
||||
|
||||
|
@ -1786,7 +1772,8 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
|
|||
compactionOutputs.remove(sf.getPath().getName());
|
||||
}
|
||||
for (String compactionOutput : compactionOutputs) {
|
||||
StoreFileInfo storeFileInfo = fs.getStoreFileInfo(getColumnFamilyName(), compactionOutput);
|
||||
StoreFileInfo storeFileInfo =
|
||||
getRegionFileSystem().getStoreFileInfo(getColumnFamilyName(), compactionOutput);
|
||||
HStoreFile storeFile = createStoreFileAndReader(storeFileInfo);
|
||||
outputStoreFiles.add(storeFile);
|
||||
}
|
||||
|
@ -2106,7 +2093,7 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
|
|||
throw new IllegalArgumentException("Number of versions must be > 0");
|
||||
}
|
||||
// Make sure we do not return more than maximum versions for this store.
|
||||
int maxVersions = this.family.getMaxVersions();
|
||||
int maxVersions = getColumnFamilyDescriptor().getMaxVersions();
|
||||
return wantedVersions > maxVersions ? maxVersions: wantedVersions;
|
||||
}
|
||||
|
||||
|
@ -2381,7 +2368,7 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
|
|||
|
||||
@Override
|
||||
public RegionInfo getRegionInfo() {
|
||||
return this.fs.getRegionInfo();
|
||||
return getRegionFileSystem().getRegionInfo();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -2523,7 +2510,8 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
|
|||
List<HStoreFile> storeFiles = new ArrayList<>(fileNames.size());
|
||||
for (String file : fileNames) {
|
||||
// open the file as a store file (hfile link, etc)
|
||||
StoreFileInfo storeFileInfo = fs.getStoreFileInfo(getColumnFamilyName(), file);
|
||||
StoreFileInfo storeFileInfo =
|
||||
getRegionFileSystem().getStoreFileInfo(getColumnFamilyName(), file);
|
||||
HStoreFile storeFile = createStoreFileAndReader(storeFileInfo);
|
||||
storeFiles.add(storeFile);
|
||||
HStore.this.storeSize.addAndGet(storeFile.getReader().length());
|
||||
|
@ -2573,7 +2561,7 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
|
|||
* @return cache configuration for this Store.
|
||||
*/
|
||||
public CacheConfig getCacheConfig() {
|
||||
return this.cacheConf;
|
||||
return storeContext.getCacheConf();
|
||||
}
|
||||
|
||||
public static final long FIXED_OVERHEAD = ClassSize.estimateBase(HStore.class, false);
|
||||
|
@ -2587,12 +2575,12 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
|
|||
@Override
|
||||
public long heapSize() {
|
||||
MemStoreSize memstoreSize = this.memstore.size();
|
||||
return DEEP_OVERHEAD + memstoreSize.getHeapSize();
|
||||
return DEEP_OVERHEAD + memstoreSize.getHeapSize() + storeContext.heapSize();
|
||||
}
|
||||
|
||||
@Override
|
||||
public CellComparator getComparator() {
|
||||
return comparator;
|
||||
return storeContext.getComparator();
|
||||
}
|
||||
|
||||
public ScanInfo getScanInfo() {
|
||||
|
@ -2666,7 +2654,7 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
|
|||
public void onConfigurationChange(Configuration conf) {
|
||||
this.conf = new CompoundConfiguration()
|
||||
.add(conf)
|
||||
.addBytesMap(family.getValues());
|
||||
.addBytesMap(getColumnFamilyDescriptor().getValues());
|
||||
this.storeEngine.compactionPolicy.setConf(conf);
|
||||
this.offPeakHours = OffPeakHours.getInstance(conf);
|
||||
}
|
||||
|
@ -2798,8 +2786,8 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
|
|||
LOG.debug("Moving the files {} to archive", filesToRemove);
|
||||
// Only if this is successful it has to be removed
|
||||
try {
|
||||
this.fs.removeStoreFiles(this.getColumnFamilyDescriptor().getNameAsString(),
|
||||
filesToRemove);
|
||||
getRegionFileSystem()
|
||||
.removeStoreFiles(this.getColumnFamilyDescriptor().getNameAsString(), filesToRemove);
|
||||
} catch (FailedArchiveException fae) {
|
||||
// Even if archiving some files failed, we still need to clear out any of the
|
||||
// files which were successfully archived. Otherwise we will receive a
|
||||
|
|
|
@ -0,0 +1,194 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.regionserver;
|
||||
|
||||
import java.net.InetSocketAddress;
|
||||
import java.util.Collection;
|
||||
import java.util.function.Supplier;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.CellComparator;
|
||||
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
|
||||
import org.apache.hadoop.hbase.io.HeapSize;
|
||||
import org.apache.hadoop.hbase.io.crypto.Encryption;
|
||||
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
|
||||
import org.apache.hadoop.hbase.util.ClassSize;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
|
||||
/**
|
||||
* This carries the immutable information and references on some of the meta data about the HStore.
|
||||
* This meta data can be used across the HFileWriter/Readers and other HStore consumers without the
|
||||
* need of passing around the complete store.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public final class StoreContext implements HeapSize {
|
||||
public static final long FIXED_OVERHEAD = ClassSize.estimateBase(HStore.class, false);
|
||||
|
||||
private final int blockSize;
|
||||
private final Encryption.Context encryptionContext;
|
||||
private final CacheConfig cacheConf;
|
||||
private final HRegionFileSystem regionFileSystem;
|
||||
private final CellComparator comparator;
|
||||
private final BloomType bloomFilterType;
|
||||
private final Supplier<Collection<HStoreFile>> compactedFilesSupplier;
|
||||
private final Supplier<InetSocketAddress[]> favoredNodesSupplier;
|
||||
private final ColumnFamilyDescriptor family;
|
||||
private final Path familyStoreDirectoryPath;
|
||||
private final RegionCoprocessorHost coprocessorHost;
|
||||
|
||||
private StoreContext(Builder builder) {
|
||||
this.blockSize = builder.blockSize;
|
||||
this.encryptionContext = builder.encryptionContext;
|
||||
this.cacheConf = builder.cacheConf;
|
||||
this.regionFileSystem = builder.regionFileSystem;
|
||||
this.comparator = builder.comparator;
|
||||
this.bloomFilterType = builder.bloomFilterType;
|
||||
this.compactedFilesSupplier = builder.compactedFilesSupplier;
|
||||
this.favoredNodesSupplier = builder.favoredNodesSupplier;
|
||||
this.family = builder.family;
|
||||
this.familyStoreDirectoryPath = builder.familyStoreDirectoryPath;
|
||||
this.coprocessorHost = builder.coprocessorHost;
|
||||
}
|
||||
|
||||
public int getBlockSize() {
|
||||
return blockSize;
|
||||
}
|
||||
|
||||
public Encryption.Context getEncryptionContext() {
|
||||
return encryptionContext;
|
||||
}
|
||||
|
||||
public CacheConfig getCacheConf() {
|
||||
return cacheConf;
|
||||
}
|
||||
|
||||
public HRegionFileSystem getRegionFileSystem() {
|
||||
return regionFileSystem;
|
||||
}
|
||||
|
||||
public CellComparator getComparator() {
|
||||
return comparator;
|
||||
}
|
||||
|
||||
public BloomType getBloomFilterType() {
|
||||
return bloomFilterType;
|
||||
}
|
||||
|
||||
public Supplier<Collection<HStoreFile>> getCompactedFilesSupplier() {
|
||||
return compactedFilesSupplier;
|
||||
}
|
||||
|
||||
public InetSocketAddress[] getFavoredNodes() {
|
||||
return favoredNodesSupplier.get();
|
||||
}
|
||||
|
||||
public ColumnFamilyDescriptor getFamily() {
|
||||
return family;
|
||||
}
|
||||
|
||||
public Path getFamilyStoreDirectoryPath() {
|
||||
return familyStoreDirectoryPath;
|
||||
}
|
||||
|
||||
public RegionCoprocessorHost getCoprocessorHost() {
|
||||
return coprocessorHost;
|
||||
}
|
||||
|
||||
public static Builder getBuilder() {
|
||||
return new Builder();
|
||||
}
|
||||
|
||||
@Override
|
||||
public long heapSize() {
|
||||
return FIXED_OVERHEAD;
|
||||
}
|
||||
|
||||
public static class Builder {
|
||||
private int blockSize;
|
||||
private Encryption.Context encryptionContext;
|
||||
private CacheConfig cacheConf;
|
||||
private HRegionFileSystem regionFileSystem;
|
||||
private CellComparator comparator;
|
||||
private BloomType bloomFilterType;
|
||||
private Supplier<Collection<HStoreFile>> compactedFilesSupplier;
|
||||
private Supplier<InetSocketAddress[]> favoredNodesSupplier;
|
||||
private ColumnFamilyDescriptor family;
|
||||
private Path familyStoreDirectoryPath;
|
||||
private RegionCoprocessorHost coprocessorHost;
|
||||
|
||||
public Builder withBlockSize(int blockSize) {
|
||||
this.blockSize = blockSize;
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder withEncryptionContext(Encryption.Context encryptionContext) {
|
||||
this.encryptionContext = encryptionContext;
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder withCacheConfig(CacheConfig cacheConf) {
|
||||
this.cacheConf = cacheConf;
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder withRegionFileSystem(HRegionFileSystem regionFileSystem) {
|
||||
this.regionFileSystem = regionFileSystem;
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder withCellComparator(CellComparator comparator) {
|
||||
this.comparator = comparator;
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder withBloomType(BloomType bloomFilterType) {
|
||||
this.bloomFilterType = bloomFilterType;
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder withCompactedFilesSupplier(Supplier<Collection<HStoreFile>>
|
||||
compactedFilesSupplier) {
|
||||
this.compactedFilesSupplier = compactedFilesSupplier;
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder withFavoredNodesSupplier(Supplier<InetSocketAddress[]> favoredNodesSupplier) {
|
||||
this.favoredNodesSupplier = favoredNodesSupplier;
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder withColumnFamilyDescriptor(ColumnFamilyDescriptor family) {
|
||||
this.family = family;
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder withFamilyStoreDirectoryPath(Path familyStoreDirectoryPath) {
|
||||
this.familyStoreDirectoryPath = familyStoreDirectoryPath;
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder withRegionCoprocessorHost(RegionCoprocessorHost coprocessorHost) {
|
||||
this.coprocessorHost = coprocessorHost;
|
||||
return this;
|
||||
}
|
||||
|
||||
public StoreContext build() {
|
||||
return new StoreContext(this);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -24,9 +24,13 @@ import java.util.Optional;
|
|||
import java.util.OptionalInt;
|
||||
import java.util.OptionalLong;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.Cell;
|
||||
import org.apache.hadoop.hbase.CellComparator;
|
||||
import org.apache.hadoop.hbase.CellUtil;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.io.hfile.HFile;
|
||||
import org.apache.hadoop.hbase.util.ChecksumType;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
@ -136,4 +140,25 @@ public class StoreUtils {
|
|||
return largestFile.isPresent() ? StoreUtils.getFileSplitPoint(largestFile.get(), comparator)
|
||||
: Optional.empty();
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the configured checksum algorithm.
|
||||
* @param conf The configuration
|
||||
* @return The checksum algorithm that is set in the configuration
|
||||
*/
|
||||
public static ChecksumType getChecksumType(Configuration conf) {
|
||||
return ChecksumType.nameToType(
|
||||
conf.get(HConstants.CHECKSUM_TYPE_NAME, ChecksumType.getDefaultChecksumType().getName()));
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the configured bytesPerChecksum value.
|
||||
* @param conf The configuration
|
||||
* @return The bytesPerChecksum that is set in the configuration
|
||||
*/
|
||||
public static int getBytesPerChecksum(Configuration conf) {
|
||||
return conf.getInt(HConstants.BYTES_PER_CHECKSUM,
|
||||
HFile.DEFAULT_BYTES_PER_CHECKSUM);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -89,6 +89,7 @@ import org.apache.hadoop.hbase.regionserver.BloomType;
|
|||
import org.apache.hadoop.hbase.regionserver.HStore;
|
||||
import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
|
||||
import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
|
||||
import org.apache.hadoop.hbase.regionserver.StoreUtils;
|
||||
import org.apache.hadoop.hbase.security.UserProvider;
|
||||
import org.apache.hadoop.hbase.security.token.FsDelegationToken;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
|
@ -1165,8 +1166,8 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
|
|||
Algorithm compression = familyDescriptor.getCompressionType();
|
||||
BloomType bloomFilterType = familyDescriptor.getBloomFilterType();
|
||||
HFileContext hFileContext = new HFileContextBuilder().withCompression(compression)
|
||||
.withChecksumType(HStore.getChecksumType(conf))
|
||||
.withBytesPerCheckSum(HStore.getBytesPerChecksum(conf)).withBlockSize(blocksize)
|
||||
.withChecksumType(StoreUtils.getChecksumType(conf))
|
||||
.withBytesPerCheckSum(StoreUtils.getBytesPerChecksum(conf)).withBlockSize(blocksize)
|
||||
.withDataBlockEncoding(familyDescriptor.getDataBlockEncoding()).withIncludesTags(true)
|
||||
.build();
|
||||
halfWriter = new StoreFileWriter.Builder(conf, cacheConf, fs).withFilePath(outFile)
|
||||
|
|
|
@ -39,8 +39,8 @@ import org.apache.hadoop.hbase.io.hfile.CacheConfig;
|
|||
import org.apache.hadoop.hbase.io.hfile.HFileContext;
|
||||
import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
|
||||
import org.apache.hadoop.hbase.regionserver.CellSet;
|
||||
import org.apache.hadoop.hbase.regionserver.HStore;
|
||||
import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
|
||||
import org.apache.hadoop.hbase.regionserver.StoreUtils;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.wal.EntryBuffers.RegionEntryBuffer;
|
||||
import org.apache.hadoop.hbase.wal.WAL.Entry;
|
||||
|
@ -199,8 +199,8 @@ public class BoundedRecoveredHFilesOutputSink extends OutputSink {
|
|||
new StoreFileWriter.Builder(walSplitter.conf, CacheConfig.DISABLED, walSplitter.rootFS)
|
||||
.withOutputDir(outputDir);
|
||||
HFileContext hFileContext = new HFileContextBuilder().
|
||||
withChecksumType(HStore.getChecksumType(walSplitter.conf)).
|
||||
withBytesPerCheckSum(HStore.getBytesPerChecksum(walSplitter.conf)).
|
||||
withChecksumType(StoreUtils.getChecksumType(walSplitter.conf)).
|
||||
withBytesPerCheckSum(StoreUtils.getBytesPerChecksum(walSplitter.conf)).
|
||||
withCellComparator(isMetaTable?
|
||||
MetaCellComparator.META_COMPARATOR: CellComparatorImpl.COMPARATOR).build();
|
||||
return writerBuilder.withFileContext(hFileContext).build();
|
||||
|
|
|
@ -62,6 +62,7 @@ import org.apache.hadoop.hbase.regionserver.ImmutableSegment;
|
|||
import org.apache.hadoop.hbase.regionserver.MemStoreCompactor;
|
||||
import org.apache.hadoop.hbase.regionserver.MutableSegment;
|
||||
import org.apache.hadoop.hbase.regionserver.Segment;
|
||||
import org.apache.hadoop.hbase.regionserver.StoreContext;
|
||||
import org.apache.hadoop.hbase.regionserver.TimeRangeTracker.NonSyncTimeRangeTracker;
|
||||
import org.apache.hadoop.hbase.regionserver.TimeRangeTracker.SyncTimeRangeTracker;
|
||||
import org.apache.hadoop.hbase.regionserver.throttle.StoreHotnessProtector;
|
||||
|
@ -606,7 +607,7 @@ public class TestHeapSize {
|
|||
@Test
|
||||
public void testAutoCalcFixedOverHead() {
|
||||
Class[] classList = new Class[] { HFileContext.class, HRegion.class, BlockCacheKey.class,
|
||||
HFileBlock.class, HStore.class, LruBlockCache.class };
|
||||
HFileBlock.class, HStore.class, LruBlockCache.class, StoreContext.class };
|
||||
for (Class cl : classList) {
|
||||
// do estimate in advance to ensure class is loaded
|
||||
ClassSize.estimateBase(cl, false);
|
||||
|
|
|
@ -7638,7 +7638,7 @@ public class TestHRegion {
|
|||
LOG.warn("hbase.hstore.compaction.complete is set to false");
|
||||
List<HStoreFile> sfs = new ArrayList<>(newFiles.size());
|
||||
final boolean evictOnClose =
|
||||
cacheConf != null? cacheConf.shouldEvictOnClose(): true;
|
||||
getCacheConfig() != null? getCacheConfig().shouldEvictOnClose(): true;
|
||||
for (Path newFile : newFiles) {
|
||||
// Create storefile around what we wrote with a reader on it.
|
||||
HStoreFile sf = createStoreFileAndReader(newFile);
|
||||
|
|
|
@ -222,8 +222,8 @@ public class TestSecureBulkLoadManager {
|
|||
.withIncludesTags(true)
|
||||
.withCompression(compression)
|
||||
.withCompressTags(family.isCompressTags())
|
||||
.withChecksumType(HStore.getChecksumType(conf))
|
||||
.withBytesPerCheckSum(HStore.getBytesPerChecksum(conf))
|
||||
.withChecksumType(StoreUtils.getChecksumType(conf))
|
||||
.withBytesPerCheckSum(StoreUtils.getBytesPerChecksum(conf))
|
||||
.withBlockSize(family.getBlocksize())
|
||||
.withHBaseCheckSum(true)
|
||||
.withDataBlockEncoding(family.getDataBlockEncoding())
|
||||
|
|
Loading…
Reference in New Issue