HBASE-26064 Introduce a StoreFileTracker to abstract the store file tracking logic

Signed-off-by: Wellington Chevreuil <wchevreuil@apache.org>

Conflicts:
	hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreCompactor.java
	hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreFlusher.java
	hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
	hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultStoreEngine.java
	hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreScannerClosure.java
	hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeStoreEngine.java
	hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestDateTieredCompactor.java
	hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactor.java
This commit is contained in:
Duo Zhang 2021-07-29 18:35:19 +08:00 committed by Andrew Purtell
parent 64a2e9c088
commit 073656bf88
36 changed files with 1278 additions and 742 deletions

View File

@ -22,7 +22,6 @@ import java.io.InterruptedIOException;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
@ -80,17 +79,16 @@ public class DefaultMobStoreCompactor extends DefaultCompactor {
};
private final CellSinkFactory<StoreFileWriter> writerFactory =
new CellSinkFactory<StoreFileWriter>() {
@Override
public StoreFileWriter createWriter(InternalScanner scanner,
org.apache.hadoop.hbase.regionserver.compactions.Compactor.FileDetails fd,
boolean shouldDropBehind, boolean major) throws IOException {
// make this writer with tags always because of possible new cells with tags.
return store.createWriterInTmp(fd.maxKeyCount,
major ? majorCompactionCompression : minorCompactionCompression, true, true, true,
shouldDropBehind);
}
};
new CellSinkFactory<StoreFileWriter>() {
@Override
public StoreFileWriter createWriter(InternalScanner scanner,
org.apache.hadoop.hbase.regionserver.compactions.Compactor.FileDetails fd,
boolean shouldDropBehind, boolean major) throws IOException {
// make this writer with tags always because of possible new cells with tags.
return store.getStoreEngine().createWriter(
createParams(fd, shouldDropBehind, major).includeMVCCReadpoint(true).includesTag(true));
}
};
public DefaultMobStoreCompactor(Configuration conf, HStore store) {
super(conf, store);

View File

@ -23,7 +23,6 @@ import java.io.InterruptedIOException;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
@ -115,8 +114,7 @@ public class DefaultMobStoreFlusher extends DefaultStoreFlusher {
synchronized (flushLock) {
status.setStatus("Flushing " + store + ": creating writer");
// Write the map out to the disk
writer = store.createWriterInTmp(cellsCount, store.getColumnFamilyDescriptor().getCompressionType(),
false, true, true, false);
writer = createWriter(snapshot, true);
IOException e = null;
try {
// It's a mob store, flush the cells in a mob way. This is the difference of flushing

View File

@ -0,0 +1,134 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.yetus.audience.InterfaceAudience;
@InterfaceAudience.Private
public final class CreateStoreFileWriterParams {
private long maxKeyCount;
private Compression.Algorithm compression;
private boolean isCompaction;
private boolean includeMVCCReadpoint;
private boolean includesTag;
private boolean shouldDropBehind;
private long totalCompactedFilesSize = -1;
private String fileStoragePolicy = HConstants.EMPTY_STRING;
private CreateStoreFileWriterParams() {
}
public long maxKeyCount() {
return maxKeyCount;
}
public CreateStoreFileWriterParams maxKeyCount(long maxKeyCount) {
this.maxKeyCount = maxKeyCount;
return this;
}
public Compression.Algorithm compression() {
return compression;
}
/**
* Set the compression algorithm to use
*/
public CreateStoreFileWriterParams compression(Compression.Algorithm compression) {
this.compression = compression;
return this;
}
public boolean isCompaction() {
return isCompaction;
}
/**
* Whether we are creating a new file in a compaction
*/
public CreateStoreFileWriterParams isCompaction(boolean isCompaction) {
this.isCompaction = isCompaction;
return this;
}
public boolean includeMVCCReadpoint() {
return includeMVCCReadpoint;
}
/**
* Whether to include MVCC or not
*/
public CreateStoreFileWriterParams includeMVCCReadpoint(boolean includeMVCCReadpoint) {
this.includeMVCCReadpoint = includeMVCCReadpoint;
return this;
}
public boolean includesTag() {
return includesTag;
}
/**
* Whether to includesTag or not
*/
public CreateStoreFileWriterParams includesTag(boolean includesTag) {
this.includesTag = includesTag;
return this;
}
public boolean shouldDropBehind() {
return shouldDropBehind;
}
public CreateStoreFileWriterParams shouldDropBehind(boolean shouldDropBehind) {
this.shouldDropBehind = shouldDropBehind;
return this;
}
public long totalCompactedFilesSize() {
return totalCompactedFilesSize;
}
public CreateStoreFileWriterParams totalCompactedFilesSize(long totalCompactedFilesSize) {
this.totalCompactedFilesSize = totalCompactedFilesSize;
return this;
}
public String fileStoragePolicy() {
return fileStoragePolicy;
}
public CreateStoreFileWriterParams fileStoragePolicy(String fileStoragePolicy) {
this.fileStoragePolicy = fileStoragePolicy;
return this;
}
public static CreateStoreFileWriterParams create() {
return new CreateStoreFileWriterParams();
}
}

View File

@ -19,18 +19,17 @@ package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequestImpl;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequestImpl;
import org.apache.hadoop.hbase.regionserver.compactions.DateTieredCompactionPolicy;
import org.apache.hadoop.hbase.regionserver.compactions.DateTieredCompactionRequest;
import org.apache.hadoop.hbase.regionserver.compactions.DateTieredCompactor;
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
import org.apache.hadoop.hbase.security.User;
import org.apache.yetus.audience.InterfaceAudience;
/**
* HBASE-15400 This store engine allows us to store data in date tiered layout with exponential

View File

@ -20,7 +20,6 @@ package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.CellComparator;
@ -39,8 +38,8 @@ import org.apache.yetus.audience.InterfaceAudience;
* their derivatives.
*/
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
public class DefaultStoreEngine extends StoreEngine<
DefaultStoreFlusher, RatioBasedCompactionPolicy, DefaultCompactor, DefaultStoreFileManager> {
public class DefaultStoreEngine extends StoreEngine<DefaultStoreFlusher,
RatioBasedCompactionPolicy, DefaultCompactor, DefaultStoreFileManager> {
public static final String DEFAULT_STORE_FLUSHER_CLASS_KEY =
"hbase.hstore.defaultengine.storeflusher.class";

View File

@ -21,15 +21,14 @@ package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
import org.apache.hadoop.util.StringUtils;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Default implementation of StoreFlusher.
@ -60,9 +59,7 @@ public class DefaultStoreFlusher extends StoreFlusher {
synchronized (flushLock) {
status.setStatus("Flushing " + store + ": creating writer");
// Write the map out to the disk
writer = store.createWriterInTmp(cellsCount,
store.getColumnFamilyDescriptor().getCompressionType(), false, true,
snapshot.isTagsPresent(), false);
writer = createWriter(snapshot, false);
IOException e = null;
try {
performFlush(scanner, writer, throughputController);

View File

@ -27,7 +27,6 @@ import java.util.NavigableSet;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@ -158,7 +157,7 @@ public class HMobStore extends HStore {
protected StoreEngine<?, ?, ?, ?> createStoreEngine(HStore store, Configuration conf,
CellComparator cellComparator) throws IOException {
MobStoreEngine engine = new MobStoreEngine();
engine.createComponents(conf, store, cellComparator);
engine.createComponentsOnce(conf, store, cellComparator);
return engine;
}

View File

@ -145,7 +145,7 @@ public class HRegionFileSystem {
// Temp Helpers
// ===========================================================================
/** @return {@link Path} to the region's temp directory, used for file creations */
Path getTempDir() {
public Path getTempDir() {
return new Path(getRegionDir(), REGION_TEMP_DIR);
}
@ -240,11 +240,7 @@ public class HRegionFileSystem {
* @param familyName Column Family Name
* @return a set of {@link StoreFileInfo} for the specified family.
*/
public Collection<StoreFileInfo> getStoreFiles(final byte[] familyName) throws IOException {
return getStoreFiles(Bytes.toString(familyName));
}
public Collection<StoreFileInfo> getStoreFiles(final String familyName) throws IOException {
public List<StoreFileInfo> getStoreFiles(final String familyName) throws IOException {
return getStoreFiles(familyName, true);
}
@ -254,7 +250,7 @@ public class HRegionFileSystem {
* @param familyName Column Family Name
* @return a set of {@link StoreFileInfo} for the specified family.
*/
public Collection<StoreFileInfo> getStoreFiles(final String familyName, final boolean validate)
public List<StoreFileInfo> getStoreFiles(final String familyName, final boolean validate)
throws IOException {
Path familyDir = getStoreDir(familyName);
FileStatus[] files = CommonFSUtils.listStatus(this.fs, familyDir);

View File

@ -23,6 +23,7 @@ import java.util.function.Supplier;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.io.HeapSize;
import org.apache.hadoop.hbase.io.crypto.Encryption;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
@ -108,6 +109,14 @@ public final class StoreContext implements HeapSize {
return coprocessorHost;
}
public RegionInfo getRegionInfo() {
return regionFileSystem.getRegionInfo();
}
public boolean isPrimaryReplicaStore() {
return getRegionInfo().getReplicaId() == RegionInfo.DEFAULT_REPLICA_ID;
}
public static Builder getBuilder() {
return new Builder();
}

View File

@ -19,38 +19,131 @@
package org.apache.hadoop.hbase.regionserver;
import com.google.errorprone.annotations.RestrictedApi;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Function;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.log.HBaseMarkers;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionPolicy;
import org.apache.hadoop.hbase.regionserver.compactions.Compactor;
import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTracker;
import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory;
import org.apache.hadoop.hbase.util.ReflectionUtils;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils;
/**
* StoreEngine is a factory that can create the objects necessary for HStore to operate.
* Since not all compaction policies, compactors and store file managers are compatible,
* they are tied together and replaced together via StoreEngine-s.
* StoreEngine is a factory that can create the objects necessary for HStore to operate. Since not
* all compaction policies, compactors and store file managers are compatible, they are tied
* together and replaced together via StoreEngine-s.
* <p/>
* We expose read write lock methods to upper layer for store operations:<br/>
* <ul>
* <li>Locked in shared mode when the list of component stores is looked at:
* <ul>
* <li>all reads/writes to table data</li>
* <li>checking for split</li>
* </ul>
* </li>
* <li>Locked in exclusive mode when the list of component stores is modified:
* <ul>
* <li>closing</li>
* <li>completing a compaction</li>
* </ul>
* </li>
* </ul>
* <p/>
* It is a bit confusing that we have a StoreFileManager(SFM) and then a StoreFileTracker(SFT). As
* its name says, SFT is used to track the store files list. The reason why we have a SFT beside SFM
* is that, when introducing stripe compaction, we introduced the StoreEngine and also the SFM, but
* actually, the SFM here is not a general 'Manager', it is only designed to manage the in memory
* 'stripes', so we can select different store files when scanning or compacting. The 'tracking' of
* store files is actually done in {@link org.apache.hadoop.hbase.regionserver.HRegionFileSystem}
* and {@link HStore} before we have SFT. And since SFM is designed to only holds in memory states,
* we will hold write lock when updating it, the lock is also used to protect the normal read/write
* requests. This means we'd better not add IO operations to SFM. And also, no matter what the in
* memory state is, stripe or not, it does not effect how we track the store files. So consider all
* these facts, here we introduce a separated SFT to track the store files.
* <p/>
* Here, since we always need to update SFM and SFT almost at the same time, we introduce methods in
* StoreEngine directly to update them both, so upper layer just need to update StoreEngine once, to
* reduce the possible misuse.
*/
@InterfaceAudience.Private
public abstract class StoreEngine<SF extends StoreFlusher,
CP extends CompactionPolicy, C extends Compactor, SFM extends StoreFileManager> {
public abstract class StoreEngine<SF extends StoreFlusher, CP extends CompactionPolicy,
C extends Compactor, SFM extends StoreFileManager> {
private static final Logger LOG = LoggerFactory.getLogger(StoreEngine.class);
protected SF storeFlusher;
protected CP compactionPolicy;
protected C compactor;
protected SFM storeFileManager;
private Configuration conf;
private StoreContext ctx;
private RegionCoprocessorHost coprocessorHost;
private Function<String, ExecutorService> openStoreFileThreadPoolCreator;
private StoreFileTracker storeFileTracker;
private final ReadWriteLock storeLock = new ReentrantReadWriteLock();
/**
* The name of the configuration parameter that specifies the class of
* a store engine that is used to manage and compact HBase store files.
* The name of the configuration parameter that specifies the class of a store engine that is used
* to manage and compact HBase store files.
*/
public static final String STORE_ENGINE_CLASS_KEY = "hbase.hstore.engine.class";
private static final Class<? extends StoreEngine<?, ?, ?, ?>>
DEFAULT_STORE_ENGINE_CLASS = DefaultStoreEngine.class;
private static final Class<? extends StoreEngine<?, ?, ?, ?>> DEFAULT_STORE_ENGINE_CLASS =
DefaultStoreEngine.class;
/**
* Acquire read lock of this store.
*/
public void readLock() {
storeLock.readLock().lock();
}
/**
* Release read lock of this store.
*/
public void readUnlock() {
storeLock.readLock().unlock();
}
/**
* Acquire write lock of this store.
*/
public void writeLock() {
storeLock.writeLock().lock();
}
/**
* Release write lock of this store.
*/
public void writeUnlock() {
storeLock.writeLock().unlock();
}
/**
* @return Compaction policy to use.
@ -80,6 +173,11 @@ public abstract class StoreEngine<SF extends StoreFlusher,
return this.storeFlusher;
}
private StoreFileTracker createStoreFileTracker(HStore store) {
return StoreFileTrackerFactory.create(store.conf, store.getRegionInfo().getTable(),
store.isPrimaryReplicaStore(), store.getStoreContext());
}
/**
* @param filesCompacting Files currently compacting
* @return whether a compaction selection is possible
@ -87,8 +185,8 @@ public abstract class StoreEngine<SF extends StoreFlusher,
public abstract boolean needsCompaction(List<HStoreFile> filesCompacting);
/**
* Creates an instance of a compaction context specific to this engine.
* Doesn't actually select or start a compaction. See CompactionContext class comment.
* Creates an instance of a compaction context specific to this engine. Doesn't actually select or
* start a compaction. See CompactionContext class comment.
* @return New CompactionContext object.
*/
public abstract CompactionContext createCompaction() throws IOException;
@ -96,36 +194,347 @@ public abstract class StoreEngine<SF extends StoreFlusher,
/**
* Create the StoreEngine's components.
*/
protected abstract void createComponents(
Configuration conf, HStore store, CellComparator cellComparator) throws IOException;
protected abstract void createComponents(Configuration conf, HStore store,
CellComparator cellComparator) throws IOException;
private void createComponentsOnce(
Configuration conf, HStore store, CellComparator cellComparator) throws IOException {
assert compactor == null && compactionPolicy == null
&& storeFileManager == null && storeFlusher == null;
protected final void createComponentsOnce(Configuration conf, HStore store,
CellComparator cellComparator) throws IOException {
assert compactor == null && compactionPolicy == null && storeFileManager == null &&
storeFlusher == null && storeFileTracker == null;
createComponents(conf, store, cellComparator);
assert compactor != null && compactionPolicy != null
&& storeFileManager != null && storeFlusher != null;
this.conf = conf;
this.ctx = store.getStoreContext();
this.coprocessorHost = store.getHRegion().getCoprocessorHost();
this.openStoreFileThreadPoolCreator = store.getHRegion()::getStoreFileOpenAndCloseThreadPool;
this.storeFileTracker = createStoreFileTracker(store);
assert compactor != null && compactionPolicy != null && storeFileManager != null &&
storeFlusher != null && storeFileTracker != null;
}
/**
* Create a writer for writing new store files.
* @return Writer for a new StoreFile
*/
public StoreFileWriter createWriter(CreateStoreFileWriterParams params) throws IOException {
return storeFileTracker.createWriter(params);
}
public HStoreFile createStoreFileAndReader(Path p) throws IOException {
StoreFileInfo info = new StoreFileInfo(conf, ctx.getRegionFileSystem().getFileSystem(), p,
ctx.isPrimaryReplicaStore());
return createStoreFileAndReader(info);
}
public HStoreFile createStoreFileAndReader(StoreFileInfo info) throws IOException {
info.setRegionCoprocessorHost(coprocessorHost);
HStoreFile storeFile =
new HStoreFile(info, ctx.getFamily().getBloomFilterType(), ctx.getCacheConf());
storeFile.initReader();
return storeFile;
}
/**
* Validates a store file by opening and closing it. In HFileV2 this should not be an expensive
* operation.
* @param path the path to the store file
*/
public void validateStoreFile(Path path) throws IOException {
HStoreFile storeFile = null;
try {
storeFile = createStoreFileAndReader(path);
} catch (IOException e) {
LOG.error("Failed to open store file : {}, keeping it in tmp location", path, e);
throw e;
} finally {
if (storeFile != null) {
storeFile.closeStoreFile(false);
}
}
}
private List<HStoreFile> openStoreFiles(Collection<StoreFileInfo> files, boolean warmup)
throws IOException {
if (CollectionUtils.isEmpty(files)) {
return Collections.emptyList();
}
// initialize the thread pool for opening store files in parallel..
ExecutorService storeFileOpenerThreadPool =
openStoreFileThreadPoolCreator.apply("StoreFileOpener-" +
ctx.getRegionInfo().getEncodedName() + "-" + ctx.getFamily().getNameAsString());
CompletionService<HStoreFile> completionService =
new ExecutorCompletionService<>(storeFileOpenerThreadPool);
int totalValidStoreFile = 0;
for (StoreFileInfo storeFileInfo : files) {
// The StoreFileInfo will carry store configuration down to HFile, we need to set it to
// our store's CompoundConfiguration here.
storeFileInfo.setConf(conf);
// open each store file in parallel
completionService.submit(() -> createStoreFileAndReader(storeFileInfo));
totalValidStoreFile++;
}
Set<String> compactedStoreFiles = new HashSet<>();
ArrayList<HStoreFile> results = new ArrayList<>(files.size());
IOException ioe = null;
try {
for (int i = 0; i < totalValidStoreFile; i++) {
try {
HStoreFile storeFile = completionService.take().get();
if (storeFile != null) {
LOG.debug("loaded {}", storeFile);
results.add(storeFile);
compactedStoreFiles.addAll(storeFile.getCompactedStoreFiles());
}
} catch (InterruptedException e) {
if (ioe == null) {
ioe = new InterruptedIOException(e.getMessage());
}
} catch (ExecutionException e) {
if (ioe == null) {
ioe = new IOException(e.getCause());
}
}
}
} finally {
storeFileOpenerThreadPool.shutdownNow();
}
if (ioe != null) {
// close StoreFile readers
boolean evictOnClose =
ctx.getCacheConf() != null ? ctx.getCacheConf().shouldEvictOnClose() : true;
for (HStoreFile file : results) {
try {
if (file != null) {
file.closeStoreFile(evictOnClose);
}
} catch (IOException e) {
LOG.warn("Could not close store file {}", file, e);
}
}
throw ioe;
}
// Should not archive the compacted store files when region warmup. See HBASE-22163.
if (!warmup) {
// Remove the compacted files from result
List<HStoreFile> filesToRemove = new ArrayList<>(compactedStoreFiles.size());
for (HStoreFile storeFile : results) {
if (compactedStoreFiles.contains(storeFile.getPath().getName())) {
LOG.warn("Clearing the compacted storefile {} from {}", storeFile, this);
storeFile.getReader().close(
storeFile.getCacheConf() != null ? storeFile.getCacheConf().shouldEvictOnClose() :
true);
filesToRemove.add(storeFile);
}
}
results.removeAll(filesToRemove);
if (!filesToRemove.isEmpty() && ctx.isPrimaryReplicaStore()) {
LOG.debug("Moving the files {} to archive", filesToRemove);
ctx.getRegionFileSystem().removeStoreFiles(ctx.getFamily().getNameAsString(),
filesToRemove);
}
}
return results;
}
public void initialize(boolean warmup) throws IOException {
List<StoreFileInfo> fileInfos = storeFileTracker.load();
List<HStoreFile> files = openStoreFiles(fileInfos, warmup);
storeFileManager.loadFiles(files);
}
public void refreshStoreFiles() throws IOException {
List<StoreFileInfo> fileInfos = storeFileTracker.load();
refreshStoreFilesInternal(fileInfos);
}
public void refreshStoreFiles(Collection<String> newFiles) throws IOException {
List<StoreFileInfo> storeFiles = new ArrayList<>(newFiles.size());
for (String file : newFiles) {
storeFiles
.add(ctx.getRegionFileSystem().getStoreFileInfo(ctx.getFamily().getNameAsString(), file));
}
refreshStoreFilesInternal(storeFiles);
}
/**
* Checks the underlying store files, and opens the files that have not been opened, and removes
* the store file readers for store files no longer available. Mainly used by secondary region
* replicas to keep up to date with the primary region files.
*/
private void refreshStoreFilesInternal(Collection<StoreFileInfo> newFiles) throws IOException {
Collection<HStoreFile> currentFiles = storeFileManager.getStorefiles();
Collection<HStoreFile> compactedFiles = storeFileManager.getCompactedfiles();
if (currentFiles == null) {
currentFiles = Collections.emptySet();
}
if (newFiles == null) {
newFiles = Collections.emptySet();
}
if (compactedFiles == null) {
compactedFiles = Collections.emptySet();
}
HashMap<StoreFileInfo, HStoreFile> currentFilesSet = new HashMap<>(currentFiles.size());
for (HStoreFile sf : currentFiles) {
currentFilesSet.put(sf.getFileInfo(), sf);
}
HashMap<StoreFileInfo, HStoreFile> compactedFilesSet = new HashMap<>(compactedFiles.size());
for (HStoreFile sf : compactedFiles) {
compactedFilesSet.put(sf.getFileInfo(), sf);
}
Set<StoreFileInfo> newFilesSet = new HashSet<StoreFileInfo>(newFiles);
// Exclude the files that have already been compacted
newFilesSet = Sets.difference(newFilesSet, compactedFilesSet.keySet());
Set<StoreFileInfo> toBeAddedFiles = Sets.difference(newFilesSet, currentFilesSet.keySet());
Set<StoreFileInfo> toBeRemovedFiles = Sets.difference(currentFilesSet.keySet(), newFilesSet);
if (toBeAddedFiles.isEmpty() && toBeRemovedFiles.isEmpty()) {
return;
}
LOG.info("Refreshing store files for " + this + " files to add: " + toBeAddedFiles +
" files to remove: " + toBeRemovedFiles);
Set<HStoreFile> toBeRemovedStoreFiles = new HashSet<>(toBeRemovedFiles.size());
for (StoreFileInfo sfi : toBeRemovedFiles) {
toBeRemovedStoreFiles.add(currentFilesSet.get(sfi));
}
// try to open the files
List<HStoreFile> openedFiles = openStoreFiles(toBeAddedFiles, false);
// propogate the file changes to the underlying store file manager
replaceStoreFiles(toBeRemovedStoreFiles, openedFiles); // won't throw an exception
}
/**
* Commit the given {@code files}.
* <p/>
* We will move the file into data directory, and open it.
* @param files the files want to commit
* @param validate whether to validate the store files
* @return the committed store files
*/
public List<HStoreFile> commitStoreFiles(List<Path> files, boolean validate) throws IOException {
List<HStoreFile> committedFiles = new ArrayList<>(files.size());
HRegionFileSystem hfs = ctx.getRegionFileSystem();
String familyName = ctx.getFamily().getNameAsString();
Path storeDir = hfs.getStoreDir(familyName);
for (Path file : files) {
try {
if (validate) {
validateStoreFile(file);
}
Path committedPath;
// As we want to support writing to data directory directly, here we need to check whether
// the store file is already in the right place
if (file.getParent() != null && file.getParent().equals(storeDir)) {
// already in the right place, skip renmaing
committedPath = file;
} else {
// Write-out finished successfully, move into the right spot
committedPath = hfs.commitStoreFile(familyName, file);
}
HStoreFile sf = createStoreFileAndReader(committedPath);
committedFiles.add(sf);
} catch (IOException e) {
LOG.error("Failed to commit store file {}", file, e);
// Try to delete the files we have committed before.
// It is OK to fail when deleting as leaving the file there does not cause any data
// corruption problem. It just introduces some duplicated data which may impact read
// performance a little when reading before compaction.
for (HStoreFile sf : committedFiles) {
Path pathToDelete = sf.getPath();
try {
sf.deleteStoreFile();
} catch (IOException deleteEx) {
LOG.warn(HBaseMarkers.FATAL, "Failed to delete committed store file {}", pathToDelete,
deleteEx);
}
}
throw new IOException("Failed to commit the flush", e);
}
}
return committedFiles;
}
@FunctionalInterface
public interface IOExceptionRunnable {
void run() throws IOException;
}
/**
* Add the store files to store file manager, and also record it in the store file tracker.
* <p/>
* The {@code actionAfterAdding} will be executed after the insertion to store file manager, under
* the lock protection. Usually this is for clear the memstore snapshot.
*/
public void addStoreFiles(Collection<HStoreFile> storeFiles,
IOExceptionRunnable actionAfterAdding) throws IOException {
storeFileTracker.add(StoreUtils.toStoreFileInfo(storeFiles));
writeLock();
try {
storeFileManager.insertNewFiles(storeFiles);
actionAfterAdding.run();
} finally {
// We need the lock, as long as we are updating the storeFiles
// or changing the memstore. Let us release it before calling
// notifyChangeReadersObservers. See HBASE-4485 for a possible
// deadlock scenario that could have happened if continue to hold
// the lock.
writeUnlock();
}
}
public void replaceStoreFiles(Collection<HStoreFile> compactedFiles,
Collection<HStoreFile> newFiles) throws IOException {
storeFileTracker.replace(StoreUtils.toStoreFileInfo(compactedFiles),
StoreUtils.toStoreFileInfo(newFiles));
writeLock();
try {
storeFileManager.addCompactionResults(compactedFiles, newFiles);
} finally {
writeUnlock();
}
}
public void removeCompactedFiles(Collection<HStoreFile> compactedFiles) {
writeLock();
try {
storeFileManager.removeCompactedFiles(compactedFiles);
} finally {
writeUnlock();
}
}
/**
* Create the StoreEngine configured for the given Store.
* @param store The store. An unfortunate dependency needed due to it
* being passed to coprocessors via the compactor.
* @param store The store. An unfortunate dependency needed due to it being passed to coprocessors
* via the compactor.
* @param conf Store configuration.
* @param cellComparator CellComparator for storeFileManager.
* @return StoreEngine to use.
*/
public static StoreEngine<?, ?, ?, ?> create(
HStore store, Configuration conf, CellComparator cellComparator) throws IOException {
public static StoreEngine<?, ?, ?, ?> create(HStore store, Configuration conf,
CellComparator cellComparator) throws IOException {
String className = conf.get(STORE_ENGINE_CLASS_KEY, DEFAULT_STORE_ENGINE_CLASS.getName());
try {
StoreEngine<?,?,?,?> se = ReflectionUtils.instantiateWithCustomCtor(
className, new Class[] { }, new Object[] { });
StoreEngine<?, ?, ?, ?> se =
ReflectionUtils.instantiateWithCustomCtor(className, new Class[] {}, new Object[] {});
se.createComponentsOnce(conf, store, cellComparator);
return se;
} catch (Exception e) {
throw new IOException("Unable to load configured store engine '" + className + "'", e);
}
}
@RestrictedApi(explanation = "Should only be called in TestHStore", link = "",
allowedOnPath = ".*/TestHStore.java")
ReadWriteLock getLock() {
return storeLock;
}
}

View File

@ -18,6 +18,7 @@
*/
package org.apache.hadoop.hbase.regionserver;
import com.google.errorprone.annotations.RestrictedApi;
import java.io.IOException;
import java.util.Collection;
import java.util.Comparator;
@ -49,12 +50,16 @@ public interface StoreFileManager {
* Loads the initial store files into empty StoreFileManager.
* @param storeFiles The files to load.
*/
@RestrictedApi(explanation = "Should only be called in StoreEngine", link = "",
allowedOnPath = ".*(/org/apache/hadoop/hbase/regionserver/StoreEngine.java|/src/test/.*)")
void loadFiles(List<HStoreFile> storeFiles);
/**
* Adds new files, either for from MemStore flush or bulk insert, into the structure.
* @param sfs New store files.
*/
@RestrictedApi(explanation = "Should only be called in StoreEngine", link = "",
allowedOnPath = ".*(/org/apache/hadoop/hbase/regionserver/StoreEngine.java|/src/test/.*)")
void insertNewFiles(Collection<HStoreFile> sfs);
/**
@ -62,12 +67,16 @@ public interface StoreFileManager {
* @param compactedFiles The input files for the compaction.
* @param results The resulting files for the compaction.
*/
@RestrictedApi(explanation = "Should only be called in StoreEngine", link = "",
allowedOnPath = ".*(/org/apache/hadoop/hbase/regionserver/StoreEngine.java|/src/test/.*)")
void addCompactionResults(Collection<HStoreFile> compactedFiles, Collection<HStoreFile> results);
/**
* Remove the compacted files
* @param compactedFiles the list of compacted files
*/
@RestrictedApi(explanation = "Should only be called in StoreEngine", link = "",
allowedOnPath = ".*(/org/apache/hadoop/hbase/regionserver/StoreEngine.java|/src/test/.*)")
void removeCompactedFiles(Collection<HStoreFile> compactedFiles);
/**

View File

@ -69,10 +69,17 @@ abstract class StoreFlusher {
writer.close();
}
protected final StoreFileWriter createWriter(MemStoreSnapshot snapshot, boolean alwaysIncludesTag)
throws IOException {
return store.getStoreEngine()
.createWriter(CreateStoreFileWriterParams.create().maxKeyCount(snapshot.getCellsCount())
.compression(store.getColumnFamilyDescriptor().getCompressionType()).isCompaction(false)
.includeMVCCReadpoint(true).includesTag(alwaysIncludesTag || snapshot.isTagsPresent())
.shouldDropBehind(false));
}
/**
* Creates the scanner for flushing snapshot. Also calls coprocessors.
* @param snapshotScanners
* @return The scanner; null if coprocessor is canceling the flush.
*/
protected final InternalScanner createScanner(List<KeyValueScanner> snapshotScanners,

View File

@ -20,10 +20,13 @@ package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.OptionalLong;
import java.util.function.Predicate;
import java.util.function.ToLongFunction;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellComparator;
@ -42,10 +45,13 @@ import org.slf4j.LoggerFactory;
* Utility functions for region server storage layer.
*/
@InterfaceAudience.Private
public class StoreUtils {
public final class StoreUtils {
private static final Logger LOG = LoggerFactory.getLogger(StoreUtils.class);
private StoreUtils() {
}
/**
* Creates a deterministic hash code for store file collection.
*/
@ -171,4 +177,31 @@ public class StoreUtils {
return new CompoundConfiguration().add(conf).addBytesMap(td.getValues())
.addStringMap(cfd.getConfiguration()).addBytesMap(cfd.getValues());
}
public static List<StoreFileInfo> toStoreFileInfo(Collection<HStoreFile> storefiles) {
return storefiles.stream().map(HStoreFile::getFileInfo).collect(Collectors.toList());
}
public static long getTotalUncompressedBytes(List<HStoreFile> files) {
return files.stream()
.mapToLong(file -> getStorefileFieldSize(file, StoreFileReader::getTotalUncompressedBytes))
.sum();
}
public static long getStorefilesSize(Collection<HStoreFile> files,
Predicate<HStoreFile> predicate) {
return files.stream().filter(predicate)
.mapToLong(file -> getStorefileFieldSize(file, StoreFileReader::length)).sum();
}
public static long getStorefileFieldSize(HStoreFile file, ToLongFunction<StoreFileReader> f) {
if (file == null) {
return 0L;
}
StoreFileReader reader = file.getReader();
if (reader == null) {
return 0L;
}
return f.applyAsLong(reader);
}
}

View File

@ -20,20 +20,19 @@ package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequestImpl;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequestImpl;
import org.apache.hadoop.hbase.regionserver.compactions.StripeCompactionPolicy;
import org.apache.hadoop.hbase.regionserver.compactions.StripeCompactor;
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
import org.apache.hadoop.hbase.security.User;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;

View File

@ -70,7 +70,7 @@ public class StripeStoreFlusher extends StoreFlusher {
StripeMultiFileWriter mw = null;
try {
mw = req.createWriter(); // Writer according to the policy.
StripeMultiFileWriter.WriterFactory factory = createWriterFactory(cellsCount);
StripeMultiFileWriter.WriterFactory factory = createWriterFactory(snapshot);
StoreScanner storeScanner = (scanner instanceof StoreScanner) ? (StoreScanner)scanner : null;
mw.init(storeScanner, factory);
@ -98,13 +98,12 @@ public class StripeStoreFlusher extends StoreFlusher {
return result;
}
private StripeMultiFileWriter.WriterFactory createWriterFactory(final long kvCount) {
private StripeMultiFileWriter.WriterFactory createWriterFactory(MemStoreSnapshot snapshot) {
return new StripeMultiFileWriter.WriterFactory() {
@Override
public StoreFileWriter createWriter() throws IOException {
StoreFileWriter writer = store.createWriterInTmp(kvCount,
store.getColumnFamilyDescriptor().getCompressionType(), false, true, true, false);
return writer;
// XXX: it used to always pass true for includesTag, re-consider?
return StripeStoreFlusher.this.createWriter(snapshot, true);
}
};
}

View File

@ -51,13 +51,14 @@ public abstract class AbstractMultiOutputCompactor<T extends AbstractMultiFileWr
WriterFactory writerFactory = new WriterFactory() {
@Override
public StoreFileWriter createWriter() throws IOException {
return createTmpWriter(fd, shouldDropBehind, major);
return AbstractMultiOutputCompactor.this.createWriter(fd, shouldDropBehind, major);
}
@Override
public StoreFileWriter createWriterWithStoragePolicy(String fileStoragePolicy)
throws IOException {
return createTmpWriter(fd, shouldDropBehind, fileStoragePolicy, major);
throws IOException {
return AbstractMultiOutputCompactor.this.createWriter(fd, shouldDropBehind,
fileStoragePolicy, major);
}
};
// Prepare multi-writer, and perform the compaction using scanner and writer.

View File

@ -28,7 +28,6 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
@ -38,6 +37,7 @@ import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.io.hfile.HFileInfo;
import org.apache.hadoop.hbase.regionserver.CellSink;
import org.apache.hadoop.hbase.regionserver.CreateStoreFileWriterParams;
import org.apache.hadoop.hbase.regionserver.HStore;
import org.apache.hadoop.hbase.regionserver.HStoreFile;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
@ -60,6 +60,7 @@ import org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
/**
@ -260,29 +261,32 @@ public abstract class Compactor<T extends CellSink> {
}
};
/**
* Creates a writer for a new file in a temporary directory.
* @param fd The file details.
* @return Writer for a new StoreFile in the tmp dir.
* @throws IOException if creation failed
*/
protected final StoreFileWriter createTmpWriter(FileDetails fd, boolean shouldDropBehind, boolean major)
throws IOException {
// When all MVCC readpoints are 0, don't write them.
// See HBASE-8166, HBASE-12600, and HBASE-13389.
return store.createWriterInTmp(fd.maxKeyCount,
major ? majorCompactionCompression : minorCompactionCompression,
true, fd.maxMVCCReadpoint > 0,
fd.maxTagsLength > 0, shouldDropBehind, fd.totalCompactedFilesSize,
HConstants.EMPTY_STRING);
protected final CreateStoreFileWriterParams createParams(FileDetails fd, boolean shouldDropBehind,
boolean major) {
return CreateStoreFileWriterParams.create().maxKeyCount(fd.maxKeyCount)
.compression(major ? majorCompactionCompression : minorCompactionCompression)
.isCompaction(true).includeMVCCReadpoint(fd.maxMVCCReadpoint > 0)
.includesTag(fd.maxTagsLength > 0).shouldDropBehind(shouldDropBehind)
.totalCompactedFilesSize(fd.totalCompactedFilesSize);
}
protected final StoreFileWriter createTmpWriter(FileDetails fd, boolean shouldDropBehind,
String fileStoragePolicy, boolean major) throws IOException {
return store.createWriterInTmp(fd.maxKeyCount,
major ? majorCompactionCompression : minorCompactionCompression,
true, fd.maxMVCCReadpoint > 0,
fd.maxTagsLength > 0, shouldDropBehind, fd.totalCompactedFilesSize, fileStoragePolicy);
/**
* Creates a writer for a new file.
* @param fd The file details.
* @return Writer for a new StoreFile
* @throws IOException if creation failed
*/
protected final StoreFileWriter createWriter(FileDetails fd, boolean shouldDropBehind,
boolean major) throws IOException {
// When all MVCC readpoints are 0, don't write them.
// See HBASE-8166, HBASE-12600, and HBASE-13389.
return store.getStoreEngine().createWriter(createParams(fd, shouldDropBehind, major));
}
protected final StoreFileWriter createWriter(FileDetails fd, boolean shouldDropBehind,
String fileStoragePolicy, boolean major) throws IOException {
return store.getStoreEngine()
.createWriter(createParams(fd, shouldDropBehind, major).fileStoragePolicy(fileStoragePolicy));
}
private ScanInfo preCompactScannerOpen(CompactionRequestImpl request, ScanType scanType,

View File

@ -45,14 +45,14 @@ public class DefaultCompactor extends Compactor<StoreFileWriter> {
}
private final CellSinkFactory<StoreFileWriter> writerFactory =
new CellSinkFactory<StoreFileWriter>() {
@Override
public StoreFileWriter createWriter(InternalScanner scanner,
org.apache.hadoop.hbase.regionserver.compactions.Compactor.FileDetails fd,
boolean shouldDropBehind, boolean major) throws IOException {
return createTmpWriter(fd, shouldDropBehind, major);
}
};
new CellSinkFactory<StoreFileWriter>() {
@Override
public StoreFileWriter createWriter(InternalScanner scanner,
org.apache.hadoop.hbase.regionserver.compactions.Compactor.FileDetails fd,
boolean shouldDropBehind, boolean major) throws IOException {
return DefaultCompactor.this.createWriter(fd, shouldDropBehind, major);
}
};
/**
* Do a minor/major compaction on an explicit set of storefiles from a Store.

View File

@ -0,0 +1,61 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver.storefiletracker;
import java.io.IOException;
import java.util.Collection;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.regionserver.StoreContext;
import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
import org.apache.yetus.audience.InterfaceAudience;
/**
* The default implementation for store file tracker, where we do not persist the store file list,
* and use listing when loading store files.
*/
@InterfaceAudience.Private
class DefaultStoreFileTracker extends StoreFileTrackerBase {
public DefaultStoreFileTracker(Configuration conf, TableName tableName, boolean isPrimaryReplica,
StoreContext ctx) {
super(conf, tableName, isPrimaryReplica, ctx);
}
@Override
public List<StoreFileInfo> load() throws IOException {
return ctx.getRegionFileSystem().getStoreFiles(ctx.getFamily().getNameAsString());
}
@Override
public boolean requireWritingToTmpDirFirst() {
return true;
}
@Override
protected void doAddNewStoreFiles(Collection<StoreFileInfo> newFiles) throws IOException {
// NOOP
}
@Override
protected void doAddCompactionResults(Collection<StoreFileInfo> compactedFiles,
Collection<StoreFileInfo> newFiles) throws IOException {
// NOOP
}
}

View File

@ -0,0 +1,75 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver.storefiletracker;
import java.io.IOException;
import java.util.Collection;
import java.util.List;
import org.apache.hadoop.hbase.regionserver.CreateStoreFileWriterParams;
import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
import org.apache.yetus.audience.InterfaceAudience;
/**
* An interface to define how we track the store files for a give store.
* <p/>
* In the old time, we will write store to a tmp directory first, and then rename it to the actual
* data file. And once a store file is under data directory, we will consider it as 'committed'. And
* we need to do listing when loading store files.
* <p/>
* When cloud age is coming, now we want to store the store files on object storage, where rename
* and list are not as cheap as on HDFS, especially rename. Although introducing a metadata
* management layer for object storage could solve the problem, but we still want HBase to run on
* pure object storage, so here we introduce this interface to abstract how we track the store
* files. For the old implementation, we just persist nothing here, and do listing to load store
* files. When running on object storage, we could persist the store file list in a system region,
* or in a file on the object storage, to make it possible to write directly into the data directory
* to avoid renaming, and also avoid listing when loading store files.
* <p/>
* The implementation requires to be thread safe as flush and compaction may occur as the same time,
* and we could also do multiple compactions at the same time. As the implementation may choose to
* persist the store file list to external storage, which could be slow, it is the duty for the
* callers to not call it inside a lock which may block normal read/write requests.
*/
@InterfaceAudience.Private
public interface StoreFileTracker {
/**
* Load the store files list when opening a region.
*/
List<StoreFileInfo> load() throws IOException;
/**
* Add new store files.
* <p/>
* Used for flush and bulk load.
*/
void add(Collection<StoreFileInfo> newFiles) throws IOException;
/**
* Add new store files and remove compacted store files after compaction.
*/
void replace(Collection<StoreFileInfo> compactedFiles, Collection<StoreFileInfo> newFiles)
throws IOException;
/**
* Create a writer for writing new store files.
* @return Writer for a new StoreFile
*/
StoreFileWriter createWriter(CreateStoreFileWriterParams params) throws IOException;
}

View File

@ -0,0 +1,178 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver.storefiletracker;
import java.io.IOException;
import java.util.Collection;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.hadoop.hbase.io.crypto.Encryption;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.io.hfile.HFileContext;
import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
import org.apache.hadoop.hbase.regionserver.CreateStoreFileWriterParams;
import org.apache.hadoop.hbase.regionserver.StoreContext;
import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
import org.apache.hadoop.hbase.regionserver.StoreUtils;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Base class for all store file tracker.
* <p/>
* Mainly used to place the common logic to skip persistent for secondary replicas.
*/
@InterfaceAudience.Private
abstract class StoreFileTrackerBase implements StoreFileTracker {
private static final Logger LOG = LoggerFactory.getLogger(StoreFileTrackerBase.class);
protected final Configuration conf;
protected final TableName tableName;
protected final boolean isPrimaryReplica;
protected final StoreContext ctx;
private volatile boolean cacheOnWriteLogged;
protected StoreFileTrackerBase(Configuration conf, TableName tableName, boolean isPrimaryReplica,
StoreContext ctx) {
this.conf = conf;
this.tableName = tableName;
this.isPrimaryReplica = isPrimaryReplica;
this.ctx = ctx;
}
@Override
public final void add(Collection<StoreFileInfo> newFiles) throws IOException {
if (isPrimaryReplica) {
doAddNewStoreFiles(newFiles);
}
}
@Override
public final void replace(Collection<StoreFileInfo> compactedFiles,
Collection<StoreFileInfo> newFiles) throws IOException {
if (isPrimaryReplica) {
doAddCompactionResults(compactedFiles, newFiles);
}
}
private HFileContext createFileContext(Compression.Algorithm compression,
boolean includeMVCCReadpoint, boolean includesTag, Encryption.Context encryptionContext) {
if (compression == null) {
compression = HFile.DEFAULT_COMPRESSION_ALGORITHM;
}
ColumnFamilyDescriptor family = ctx.getFamily();
HFileContext hFileContext = new HFileContextBuilder().withIncludesMvcc(includeMVCCReadpoint)
.withIncludesTags(includesTag).withCompression(compression)
.withCompressTags(family.isCompressTags()).withChecksumType(StoreUtils.getChecksumType(conf))
.withBytesPerCheckSum(StoreUtils.getBytesPerChecksum(conf))
.withBlockSize(family.getBlocksize()).withHBaseCheckSum(true)
.withDataBlockEncoding(family.getDataBlockEncoding()).withEncryptionContext(encryptionContext)
.withCreateTime(EnvironmentEdgeManager.currentTime()).withColumnFamily(family.getName())
.withTableName(tableName.getName()).withCellComparator(ctx.getComparator()).build();
return hFileContext;
}
@Override
public final StoreFileWriter createWriter(CreateStoreFileWriterParams params)
throws IOException {
if (!isPrimaryReplica) {
throw new IllegalStateException("Should not call create writer on secondary replicas");
}
// creating new cache config for each new writer
final CacheConfig cacheConf = ctx.getCacheConf();
final CacheConfig writerCacheConf = new CacheConfig(cacheConf);
long totalCompactedFilesSize = params.totalCompactedFilesSize();
if (params.isCompaction()) {
// Don't cache data on write on compactions, unless specifically configured to do so
// Cache only when total file size remains lower than configured threshold
final boolean cacheCompactedBlocksOnWrite = cacheConf.shouldCacheCompactedBlocksOnWrite();
// if data blocks are to be cached on write
// during compaction, we should forcefully
// cache index and bloom blocks as well
if (cacheCompactedBlocksOnWrite &&
totalCompactedFilesSize <= cacheConf.getCacheCompactedBlocksOnWriteThreshold()) {
writerCacheConf.enableCacheOnWrite();
if (!cacheOnWriteLogged) {
LOG.info("For {} , cacheCompactedBlocksOnWrite is true, hence enabled " +
"cacheOnWrite for Data blocks, Index blocks and Bloom filter blocks", this);
cacheOnWriteLogged = true;
}
} else {
writerCacheConf.setCacheDataOnWrite(false);
if (totalCompactedFilesSize > cacheConf.getCacheCompactedBlocksOnWriteThreshold()) {
// checking condition once again for logging
LOG.debug(
"For {}, setting cacheCompactedBlocksOnWrite as false as total size of compacted " +
"files - {}, is greater than cacheCompactedBlocksOnWriteThreshold - {}",
this, totalCompactedFilesSize, cacheConf.getCacheCompactedBlocksOnWriteThreshold());
}
}
} else {
final boolean shouldCacheDataOnWrite = cacheConf.shouldCacheDataOnWrite();
if (shouldCacheDataOnWrite) {
writerCacheConf.enableCacheOnWrite();
if (!cacheOnWriteLogged) {
LOG.info("For {} , cacheDataOnWrite is true, hence enabled cacheOnWrite for " +
"Index blocks and Bloom filter blocks", this);
cacheOnWriteLogged = true;
}
}
}
Encryption.Context encryptionContext = ctx.getEncryptionContext();
HFileContext hFileContext = createFileContext(params.compression(),
params.includeMVCCReadpoint(), params.includesTag(), encryptionContext);
Path outputDir;
if (requireWritingToTmpDirFirst()) {
outputDir =
new Path(ctx.getRegionFileSystem().getTempDir(), ctx.getFamily().getNameAsString());
} else {
throw new UnsupportedOperationException("not supported yet");
}
StoreFileWriter.Builder builder =
new StoreFileWriter.Builder(conf, writerCacheConf, ctx.getRegionFileSystem().getFileSystem())
.withOutputDir(outputDir).withBloomType(ctx.getBloomFilterType())
.withMaxKeyCount(params.maxKeyCount()).withFavoredNodes(ctx.getFavoredNodes())
.withFileContext(hFileContext).withShouldDropCacheBehind(params.shouldDropBehind())
.withCompactedFilesSupplier(ctx.getCompactedFilesSupplier())
.withFileStoragePolicy(params.fileStoragePolicy());
return builder.build();
}
/**
* Whether the implementation of this tracker requires you to write to temp directory first, i.e,
* does not allow broken store files under the actual data directory.
*/
protected abstract boolean requireWritingToTmpDirFirst();
protected abstract void doAddNewStoreFiles(Collection<StoreFileInfo> newFiles) throws IOException;
protected abstract void doAddCompactionResults(Collection<StoreFileInfo> compactedFiles,
Collection<StoreFileInfo> newFiles) throws IOException;
}

View File

@ -0,0 +1,35 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver.storefiletracker;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.regionserver.StoreContext;
import org.apache.yetus.audience.InterfaceAudience;
/**
* Factory method for creating store file tracker.
*/
@InterfaceAudience.Private
public final class StoreFileTrackerFactory {
public static StoreFileTracker create(Configuration conf, TableName tableName,
boolean isPrimaryReplica, StoreContext ctx) {
return new DefaultStoreFileTracker(conf, tableName, isPrimaryReplica, ctx);
}
}

View File

@ -101,7 +101,6 @@ class MajorCompactionRequest {
boolean shouldCFBeCompacted(HRegionFileSystem fileSystem, String family, long ts)
throws IOException {
// do we have any store files?
Collection<StoreFileInfo> storeFiles = fileSystem.getStoreFiles(family);
if (storeFiles == null) {

View File

@ -210,11 +210,13 @@ public class TestIOFencing {
@Override
protected void refreshStoreSizeAndTotalBytes() throws IOException {
try {
r.compactionsWaiting.countDown();
r.compactionsBlocked.await();
} catch (InterruptedException ex) {
throw new IOException(ex);
if (r != null) {
try {
r.compactionsWaiting.countDown();
r.compactionsBlocked.await();
} catch (InterruptedException ex) {
throw new IOException(ex);
}
}
super.refreshStoreSizeAndTotalBytes();
}

View File

@ -215,8 +215,10 @@ public class TestCacheOnWriteInSchema {
@Test
public void testCacheOnWriteInSchema() throws IOException {
// Write some random data into the store
StoreFileWriter writer = store.createWriterInTmp(Integer.MAX_VALUE,
HFile.DEFAULT_COMPRESSION_ALGORITHM, false, true, false, false);
StoreFileWriter writer = store.getStoreEngine()
.createWriter(CreateStoreFileWriterParams.create().maxKeyCount(Integer.MAX_VALUE)
.compression(HFile.DEFAULT_COMPRESSION_ALGORITHM).isCompaction(false)
.includeMVCCReadpoint(true).includesTag(false).shouldDropBehind(false));
writeStoreFile(writer);
writer.close();
// Verify the block types of interest were cached on write

View File

@ -21,7 +21,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CellComparatorImpl;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor;
import org.apache.hadoop.hbase.regionserver.compactions.RatioBasedCompactionPolicy;
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
@ -65,9 +65,12 @@ public class TestDefaultStoreEngine {
DummyCompactionPolicy.class.getName());
conf.set(DefaultStoreEngine.DEFAULT_STORE_FLUSHER_CLASS_KEY,
DummyStoreFlusher.class.getName());
HRegion mockRegion = Mockito.mock(HRegion.class);
HStore mockStore = Mockito.mock(HStore.class);
Mockito.when(mockStore.getRegionInfo()).thenReturn(HRegionInfo.FIRST_META_REGIONINFO);
StoreEngine<?, ?, ?, ?> se = StoreEngine.create(mockStore, conf, CellComparatorImpl.COMPARATOR);
Mockito.when(mockStore.getRegionInfo()).thenReturn(RegionInfoBuilder.FIRST_META_REGIONINFO);
Mockito.when(mockStore.getHRegion()).thenReturn(mockRegion);
StoreEngine<?, ?, ?, ?> se =
StoreEngine.create(mockStore, conf, CellComparatorImpl.COMPARATOR);
Assert.assertTrue(se instanceof DefaultStoreEngine);
Assert.assertTrue(se.getCompactionPolicy() instanceof DummyCompactionPolicy);
Assert.assertTrue(se.getStoreFlusher() instanceof DummyStoreFlusher);

View File

@ -5746,7 +5746,7 @@ public class TestHRegion {
Collection<HStoreFile> storeFiles = primaryRegion.getStore(families[0]).getStorefiles();
primaryRegion.getRegionFileSystem().removeStoreFiles(Bytes.toString(families[0]), storeFiles);
Collection<StoreFileInfo> storeFileInfos = primaryRegion.getRegionFileSystem()
.getStoreFiles(families[0]);
.getStoreFiles(Bytes.toString(families[0]));
Assert.assertTrue(storeFileInfos == null || storeFileInfos.isEmpty());
verifyData(secondaryRegion, 0, 1000, cq, families);
@ -7715,7 +7715,7 @@ public class TestHRegion {
getCacheConfig() != null? getCacheConfig().shouldEvictOnClose(): true;
for (Path newFile : newFiles) {
// Create storefile around what we wrote with a reader on it.
HStoreFile sf = createStoreFileAndReader(newFile);
HStoreFile sf = storeEngine.createStoreFileAndReader(newFile);
sf.closeStoreFile(evictOnClose);
sfs.add(sf);
}

View File

@ -53,8 +53,8 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.IntBinaryOperator;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
@ -315,7 +315,7 @@ public class TestHStore {
/**
* Verify that compression and data block encoding are respected by the
* Store.createWriterInTmp() method, used on store flush.
* createWriter method, used on store flush.
*/
@Test
public void testCreateWriter() throws Exception {
@ -327,9 +327,11 @@ public class TestHStore {
.build();
init(name.getMethodName(), conf, hcd);
// Test createWriterInTmp()
StoreFileWriter writer =
store.createWriterInTmp(4, hcd.getCompressionType(), false, true, false, false);
// Test createWriter
StoreFileWriter writer = store.getStoreEngine()
.createWriter(CreateStoreFileWriterParams.create().maxKeyCount(4)
.compression(hcd.getCompressionType()).isCompaction(false).includeMVCCReadpoint(true)
.includesTag(false).shouldDropBehind(false));
Path path = writer.getPath();
writer.append(new KeyValue(row, family, qf1, Bytes.toBytes(1)));
writer.append(new KeyValue(row, family, qf2, Bytes.toBytes(2)));
@ -1027,19 +1029,19 @@ public class TestHStore {
// add one more file
addStoreFile();
HStore spiedStore = spy(store);
StoreEngine<?, ?, ?, ?> spiedStoreEngine = spy(store.getStoreEngine());
// call first time after files changed
spiedStore.refreshStoreFiles();
spiedStoreEngine.refreshStoreFiles();
assertEquals(2, this.store.getStorefilesCount());
verify(spiedStore, times(1)).replaceStoreFiles(any(), any());
verify(spiedStoreEngine, times(1)).replaceStoreFiles(any(), any());
// call second time
spiedStore.refreshStoreFiles();
spiedStoreEngine.refreshStoreFiles();
// ensure that replaceStoreFiles is not called, i.e, the times does not change, if files are not
// refreshed,
verify(spiedStore, times(1)).replaceStoreFiles(any(), any());
verify(spiedStoreEngine, times(1)).replaceStoreFiles(any(), any());
}
private long countMemStoreScanner(StoreScanner scanner) {
@ -1721,7 +1723,7 @@ public class TestHStore {
// Do compaction
MyThread thread = new MyThread(storeScanner);
thread.start();
store.replaceStoreFiles(actualStorefiles, actualStorefiles1);
store.replaceStoreFiles(actualStorefiles, actualStorefiles1, false);
thread.join();
KeyValueHeap heap2 = thread.getHeap();
assertFalse(heap.equals(heap2));
@ -1800,8 +1802,10 @@ public class TestHStore {
@Test
public void testHFileContextSetWithCFAndTable() throws Exception {
init(this.name.getMethodName());
StoreFileWriter writer = store.createWriterInTmp(10000L,
Compression.Algorithm.NONE, false, true, false, true);
StoreFileWriter writer = store.getStoreEngine()
.createWriter(CreateStoreFileWriterParams.create().maxKeyCount(10000L)
.compression(Compression.Algorithm.NONE).isCompaction(true).includeMVCCReadpoint(true)
.includesTag(false).shouldDropBehind(true));
HFileContext hFileContext = writer.getHFileWriter().getFileContext();
assertArrayEquals(family, hFileContext.getColumnFamily());
assertArrayEquals(table, hFileContext.getTableName());
@ -3348,7 +3352,8 @@ public class TestHStore {
int currentCount = clearSnapshotCounter.incrementAndGet();
if (currentCount == 1) {
try {
if (store.lock.isWriteLockedByCurrentThread()) {
if (((ReentrantReadWriteLock) store.getStoreEngine().getLock())
.isWriteLockedByCurrentThread()) {
shouldWait = false;
}
/**

View File

@ -244,7 +244,7 @@ public class TestRegionMergeTransactionOnCluster {
TEST_UTIL.getConfiguration(), fs, tabledir, mergedRegionInfo);
int count = 0;
for(ColumnFamilyDescriptor colFamily : columnFamilies) {
count += hrfs.getStoreFiles(colFamily.getName()).size();
count += hrfs.getStoreFiles(colFamily.getNameAsString()).size();
}
ADMIN.compactRegion(mergedRegionInfo.getRegionName());
// clean up the merged region store files
@ -253,7 +253,7 @@ public class TestRegionMergeTransactionOnCluster {
int newcount = 0;
while (EnvironmentEdgeManager.currentTime() < timeout) {
for(ColumnFamilyDescriptor colFamily : columnFamilies) {
newcount += hrfs.getStoreFiles(colFamily.getName()).size();
newcount += hrfs.getStoreFiles(colFamily.getNameAsString()).size();
}
if(newcount > count) {
break;
@ -272,7 +272,7 @@ public class TestRegionMergeTransactionOnCluster {
while (EnvironmentEdgeManager.currentTime() < timeout) {
int newcount1 = 0;
for(ColumnFamilyDescriptor colFamily : columnFamilies) {
newcount1 += hrfs.getStoreFiles(colFamily.getName()).size();
newcount1 += hrfs.getStoreFiles(colFamily.getNameAsString()).size();
}
if(newcount1 <= 1) {
break;

View File

@ -26,7 +26,6 @@ import static org.mockito.Mockito.when;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
@ -98,7 +97,7 @@ public class TestStoreFileRefresherChore {
}
@Override
public Collection<StoreFileInfo> getStoreFiles(String familyName) throws IOException {
public List<StoreFileInfo> getStoreFiles(String familyName) throws IOException {
if (fail) {
throw new IOException("simulating FS failure");
}

View File

@ -30,7 +30,6 @@ import java.util.Random;
import java.util.TreeSet;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@ -62,6 +61,7 @@ import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* This test tests whether parallel {@link StoreScanner#close()} and
* {@link StoreScanner#updateReaders(List, List)} works perfectly ensuring
@ -85,7 +85,6 @@ public class TestStoreScannerClosure {
private static CacheConfig cacheConf;
private static FileSystem fs;
private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
private static String ROOT_DIR = TEST_UTIL.getDataTestDir("TestHFile").toString();
private ScanInfo scanInfo = new ScanInfo(CONF, CF, 0, Integer.MAX_VALUE, Long.MAX_VALUE,
KeepDeletedCells.FALSE, HConstants.DEFAULT_BLOCKSIZE, 0, CellComparator.getInstance(), false);
private final static byte[] fam = Bytes.toBytes("cf_1");
@ -127,13 +126,12 @@ public class TestStoreScannerClosure {
p.addColumn(fam, Bytes.toBytes("q1"), Bytes.toBytes("val"));
region.put(p);
HStore store = region.getStore(fam);
ReentrantReadWriteLock lock = store.lock;
// use the lock to manually get a new memstore scanner. this is what
// HStore#notifyChangedReadersObservers does under the lock.(lock is not needed here
//since it is just a testcase).
lock.readLock().lock();
store.getStoreEngine().readLock();
final List<KeyValueScanner> memScanners = store.memstore.getScanners(Long.MAX_VALUE);
lock.readLock().unlock();
store.getStoreEngine().readUnlock();
Thread closeThread = new Thread() {
public void run() {
// close should be completed

View File

@ -37,6 +37,7 @@ import org.apache.hadoop.hbase.CellComparatorImpl;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequestImpl;
import org.apache.hadoop.hbase.regionserver.compactions.StripeCompactionPolicy;
@ -118,8 +119,10 @@ public class TestStripeStoreEngine {
}
private static TestStoreEngine createEngine(Configuration conf) throws Exception {
HRegion region = mock(HRegion.class);
HStore store = mock(HStore.class);
when(store.getRegionInfo()).thenReturn(HRegionInfo.FIRST_META_REGIONINFO);
when(store.getRegionInfo()).thenReturn(RegionInfoBuilder.FIRST_META_REGIONINFO);
when(store.getHRegion()).thenReturn(region);
CellComparatorImpl kvComparator = mock(CellComparatorImpl.class);
return (TestStoreEngine)StoreEngine.create(store, conf, kvComparator);
}

View File

@ -22,9 +22,6 @@ import static org.apache.hadoop.hbase.regionserver.compactions.TestCompactor.cre
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@ -44,11 +41,14 @@ import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
import org.apache.hadoop.hbase.regionserver.CreateStoreFileWriterParams;
import org.apache.hadoop.hbase.regionserver.HStore;
import org.apache.hadoop.hbase.regionserver.HStoreFile;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.regionserver.ScanInfo;
import org.apache.hadoop.hbase.regionserver.ScanType;
import org.apache.hadoop.hbase.regionserver.StoreEngine;
import org.apache.hadoop.hbase.regionserver.StoreFileScanner;
import org.apache.hadoop.hbase.regionserver.StoreUtils;
import org.apache.hadoop.hbase.regionserver.compactions.TestCompactor.Scanner;
@ -93,6 +93,7 @@ public class TestDateTieredCompactor {
@Parameter
public boolean usePrivateReaders;
@SuppressWarnings({ "rawtypes", "unchecked" })
private DateTieredCompactor createCompactor(StoreFileWritersCapture writers,
final KeyValue[] input, List<HStoreFile> storefiles) throws Exception {
Configuration conf = HBaseConfiguration.create();
@ -107,11 +108,10 @@ public class TestDateTieredCompactor {
when(store.getScanInfo()).thenReturn(si);
when(store.areWritesEnabled()).thenReturn(true);
when(store.getFileSystem()).thenReturn(mock(FileSystem.class));
when(store.getRegionInfo()).thenReturn(new HRegionInfo(TABLE_NAME));
when(store.createWriterInTmp(anyLong(), any(), anyBoolean(),
anyBoolean(), anyBoolean(), anyBoolean())).thenAnswer(writers);
when(store.createWriterInTmp(anyLong(), any(), anyBoolean(),
anyBoolean(), anyBoolean(), anyBoolean(), anyLong(), anyString())).thenAnswer(writers);
when(store.getRegionInfo()).thenReturn(RegionInfoBuilder.newBuilder(TABLE_NAME).build());
StoreEngine storeEngine = mock(StoreEngine.class);
when(storeEngine.createWriter(any(CreateStoreFileWriterParams.class))).thenAnswer(writers);
when(store.getStoreEngine()).thenReturn(storeEngine);
when(store.getComparator()).thenReturn(CellComparatorImpl.COMPARATOR);
OptionalLong maxSequenceId = StoreUtils.getMaxSequenceIdInList(storefiles);
when(store.getMaxSequenceId()).thenReturn(maxSequenceId);

View File

@ -31,7 +31,6 @@ import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.argThat;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.isNull;
@ -61,6 +60,7 @@ import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.io.TimeRange;
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.regionserver.BloomType;
import org.apache.hadoop.hbase.regionserver.CreateStoreFileWriterParams;
import org.apache.hadoop.hbase.regionserver.HStore;
import org.apache.hadoop.hbase.regionserver.HStoreFile;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
@ -68,6 +68,7 @@ import org.apache.hadoop.hbase.regionserver.ScanInfo;
import org.apache.hadoop.hbase.regionserver.ScanType;
import org.apache.hadoop.hbase.regionserver.ScannerContext;
import org.apache.hadoop.hbase.regionserver.StoreConfigInformation;
import org.apache.hadoop.hbase.regionserver.StoreEngine;
import org.apache.hadoop.hbase.regionserver.StoreFileReader;
import org.apache.hadoop.hbase.regionserver.StoreFileScanner;
import org.apache.hadoop.hbase.regionserver.StripeMultiFileWriter;
@ -903,12 +904,9 @@ public class TestStripeCompactionPolicy {
when(info.getRegionNameAsString()).thenReturn("testRegion");
when(store.getColumnFamilyDescriptor()).thenReturn(col);
when(store.getRegionInfo()).thenReturn(info);
when(
store.createWriterInTmp(anyLong(), any(), anyBoolean(),
anyBoolean(), anyBoolean(), anyBoolean())).thenAnswer(writers);
when(
store.createWriterInTmp(anyLong(), any(), anyBoolean(),
anyBoolean(), anyBoolean(), anyBoolean(), anyLong(), anyString())).thenAnswer(writers);
StoreEngine storeEngine = mock(StoreEngine.class);
when(storeEngine.createWriter(any(CreateStoreFileWriterParams.class))).thenAnswer(writers);
when(store.getStoreEngine()).thenReturn(storeEngine);
Configuration conf = HBaseConfiguration.create();
conf.setBoolean("hbase.regionserver.compaction.private.readers", usePrivateReaders);

View File

@ -21,9 +21,6 @@ import static org.apache.hadoop.hbase.regionserver.StripeStoreFileManager.OPEN_K
import static org.apache.hadoop.hbase.regionserver.compactions.TestCompactor.createDummyRequest;
import static org.junit.Assert.assertEquals;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@ -42,10 +39,13 @@ import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
import org.apache.hadoop.hbase.regionserver.CreateStoreFileWriterParams;
import org.apache.hadoop.hbase.regionserver.HStore;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.regionserver.ScanInfo;
import org.apache.hadoop.hbase.regionserver.ScanType;
import org.apache.hadoop.hbase.regionserver.StoreEngine;
import org.apache.hadoop.hbase.regionserver.StoreFileScanner;
import org.apache.hadoop.hbase.regionserver.compactions.TestCompactor.Scanner;
import org.apache.hadoop.hbase.regionserver.compactions.TestCompactor.StoreFileWritersCapture;
@ -192,6 +192,7 @@ public class TestStripeCompactor {
writers.verifyBoundaries(boundaries.toArray(new byte[][] {}));
}
@SuppressWarnings({ "rawtypes", "unchecked" })
private StripeCompactor createCompactor(StoreFileWritersCapture writers, KeyValue[] input)
throws Exception {
Configuration conf = HBaseConfiguration.create();
@ -206,11 +207,10 @@ public class TestStripeCompactor {
when(store.getScanInfo()).thenReturn(si);
when(store.areWritesEnabled()).thenReturn(true);
when(store.getFileSystem()).thenReturn(mock(FileSystem.class));
when(store.getRegionInfo()).thenReturn(new HRegionInfo(TABLE_NAME));
when(store.createWriterInTmp(anyLong(), any(), anyBoolean(),
anyBoolean(), anyBoolean(), anyBoolean())).thenAnswer(writers);
when(store.createWriterInTmp(anyLong(), any(), anyBoolean(),
anyBoolean(), anyBoolean(), anyBoolean(), anyLong(), anyString())).thenAnswer(writers);
when(store.getRegionInfo()).thenReturn(RegionInfoBuilder.newBuilder(TABLE_NAME).build());
StoreEngine storeEngine = mock(StoreEngine.class);
when(storeEngine.createWriter(any(CreateStoreFileWriterParams.class))).thenAnswer(writers);
when(store.getStoreEngine()).thenReturn(storeEngine);
when(store.getComparator()).thenReturn(CellComparatorImpl.COMPARATOR);
return new StripeCompactor(conf, store) {