HBASE-26064 Introduce a StoreFileTracker to abstract the store file tracking logic
Signed-off-by: Wellington Chevreuil <wchevreuil@apache.org>
This commit is contained in:
parent
fff59ac083
commit
04e1980dea
|
@ -22,13 +22,12 @@ import java.io.InterruptedIOException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Date;
|
import java.util.Date;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.hbase.Cell;
|
import org.apache.hadoop.hbase.Cell;
|
||||||
import org.apache.hadoop.hbase.CellUtil;
|
import org.apache.hadoop.hbase.CellUtil;
|
||||||
import org.apache.hadoop.hbase.PrivateCellUtil;
|
|
||||||
import org.apache.hadoop.hbase.KeyValue;
|
import org.apache.hadoop.hbase.KeyValue;
|
||||||
|
import org.apache.hadoop.hbase.PrivateCellUtil;
|
||||||
import org.apache.hadoop.hbase.regionserver.CellSink;
|
import org.apache.hadoop.hbase.regionserver.CellSink;
|
||||||
import org.apache.hadoop.hbase.regionserver.HMobStore;
|
import org.apache.hadoop.hbase.regionserver.HMobStore;
|
||||||
import org.apache.hadoop.hbase.regionserver.HStore;
|
import org.apache.hadoop.hbase.regionserver.HStore;
|
||||||
|
@ -80,17 +79,16 @@ public class DefaultMobStoreCompactor extends DefaultCompactor {
|
||||||
};
|
};
|
||||||
|
|
||||||
private final CellSinkFactory<StoreFileWriter> writerFactory =
|
private final CellSinkFactory<StoreFileWriter> writerFactory =
|
||||||
new CellSinkFactory<StoreFileWriter>() {
|
new CellSinkFactory<StoreFileWriter>() {
|
||||||
@Override
|
@Override
|
||||||
public StoreFileWriter createWriter(InternalScanner scanner,
|
public StoreFileWriter createWriter(InternalScanner scanner,
|
||||||
org.apache.hadoop.hbase.regionserver.compactions.Compactor.FileDetails fd,
|
org.apache.hadoop.hbase.regionserver.compactions.Compactor.FileDetails fd,
|
||||||
boolean shouldDropBehind, boolean major) throws IOException {
|
boolean shouldDropBehind, boolean major) throws IOException {
|
||||||
// make this writer with tags always because of possible new cells with tags.
|
// make this writer with tags always because of possible new cells with tags.
|
||||||
return store.createWriterInTmp(fd.maxKeyCount,
|
return store.getStoreEngine().createWriter(
|
||||||
major ? majorCompactionCompression : minorCompactionCompression, true, true, true,
|
createParams(fd, shouldDropBehind, major).includeMVCCReadpoint(true).includesTag(true));
|
||||||
shouldDropBehind);
|
}
|
||||||
}
|
};
|
||||||
};
|
|
||||||
|
|
||||||
public DefaultMobStoreCompactor(Configuration conf, HStore store) {
|
public DefaultMobStoreCompactor(Configuration conf, HStore store) {
|
||||||
super(conf, store);
|
super(conf, store);
|
||||||
|
|
|
@ -23,7 +23,6 @@ import java.io.InterruptedIOException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Date;
|
import java.util.Date;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.hbase.Cell;
|
import org.apache.hadoop.hbase.Cell;
|
||||||
|
@ -115,8 +114,7 @@ public class DefaultMobStoreFlusher extends DefaultStoreFlusher {
|
||||||
synchronized (flushLock) {
|
synchronized (flushLock) {
|
||||||
status.setStatus("Flushing " + store + ": creating writer");
|
status.setStatus("Flushing " + store + ": creating writer");
|
||||||
// Write the map out to the disk
|
// Write the map out to the disk
|
||||||
writer = store.createWriterInTmp(cellsCount, store.getColumnFamilyDescriptor().getCompressionType(),
|
writer = createWriter(snapshot, true);
|
||||||
false, true, true, false);
|
|
||||||
IOException e = null;
|
IOException e = null;
|
||||||
try {
|
try {
|
||||||
// It's a mob store, flush the cells in a mob way. This is the difference of flushing
|
// It's a mob store, flush the cells in a mob way. This is the difference of flushing
|
||||||
|
|
|
@ -0,0 +1,134 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.regionserver;
|
||||||
|
|
||||||
|
import org.apache.hadoop.hbase.HConstants;
|
||||||
|
import org.apache.hadoop.hbase.io.compress.Compression;
|
||||||
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
|
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
public final class CreateStoreFileWriterParams {
|
||||||
|
|
||||||
|
private long maxKeyCount;
|
||||||
|
|
||||||
|
private Compression.Algorithm compression;
|
||||||
|
|
||||||
|
private boolean isCompaction;
|
||||||
|
|
||||||
|
private boolean includeMVCCReadpoint;
|
||||||
|
|
||||||
|
private boolean includesTag;
|
||||||
|
|
||||||
|
private boolean shouldDropBehind;
|
||||||
|
|
||||||
|
private long totalCompactedFilesSize = -1;
|
||||||
|
|
||||||
|
private String fileStoragePolicy = HConstants.EMPTY_STRING;
|
||||||
|
|
||||||
|
private CreateStoreFileWriterParams() {
|
||||||
|
}
|
||||||
|
|
||||||
|
public long maxKeyCount() {
|
||||||
|
return maxKeyCount;
|
||||||
|
}
|
||||||
|
|
||||||
|
public CreateStoreFileWriterParams maxKeyCount(long maxKeyCount) {
|
||||||
|
this.maxKeyCount = maxKeyCount;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Compression.Algorithm compression() {
|
||||||
|
return compression;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Set the compression algorithm to use
|
||||||
|
*/
|
||||||
|
public CreateStoreFileWriterParams compression(Compression.Algorithm compression) {
|
||||||
|
this.compression = compression;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean isCompaction() {
|
||||||
|
return isCompaction;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Whether we are creating a new file in a compaction
|
||||||
|
*/
|
||||||
|
public CreateStoreFileWriterParams isCompaction(boolean isCompaction) {
|
||||||
|
this.isCompaction = isCompaction;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean includeMVCCReadpoint() {
|
||||||
|
return includeMVCCReadpoint;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Whether to include MVCC or not
|
||||||
|
*/
|
||||||
|
public CreateStoreFileWriterParams includeMVCCReadpoint(boolean includeMVCCReadpoint) {
|
||||||
|
this.includeMVCCReadpoint = includeMVCCReadpoint;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean includesTag() {
|
||||||
|
return includesTag;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Whether to includesTag or not
|
||||||
|
*/
|
||||||
|
public CreateStoreFileWriterParams includesTag(boolean includesTag) {
|
||||||
|
this.includesTag = includesTag;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean shouldDropBehind() {
|
||||||
|
return shouldDropBehind;
|
||||||
|
}
|
||||||
|
|
||||||
|
public CreateStoreFileWriterParams shouldDropBehind(boolean shouldDropBehind) {
|
||||||
|
this.shouldDropBehind = shouldDropBehind;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public long totalCompactedFilesSize() {
|
||||||
|
return totalCompactedFilesSize;
|
||||||
|
}
|
||||||
|
|
||||||
|
public CreateStoreFileWriterParams totalCompactedFilesSize(long totalCompactedFilesSize) {
|
||||||
|
this.totalCompactedFilesSize = totalCompactedFilesSize;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String fileStoragePolicy() {
|
||||||
|
return fileStoragePolicy;
|
||||||
|
}
|
||||||
|
|
||||||
|
public CreateStoreFileWriterParams fileStoragePolicy(String fileStoragePolicy) {
|
||||||
|
this.fileStoragePolicy = fileStoragePolicy;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public static CreateStoreFileWriterParams create() {
|
||||||
|
return new CreateStoreFileWriterParams();
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -19,18 +19,17 @@ package org.apache.hadoop.hbase.regionserver;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.hbase.CellComparator;
|
import org.apache.hadoop.hbase.CellComparator;
|
||||||
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequestImpl;
|
|
||||||
import org.apache.yetus.audience.InterfaceAudience;
|
|
||||||
import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
|
import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequestImpl;
|
||||||
import org.apache.hadoop.hbase.regionserver.compactions.DateTieredCompactionPolicy;
|
import org.apache.hadoop.hbase.regionserver.compactions.DateTieredCompactionPolicy;
|
||||||
import org.apache.hadoop.hbase.regionserver.compactions.DateTieredCompactionRequest;
|
import org.apache.hadoop.hbase.regionserver.compactions.DateTieredCompactionRequest;
|
||||||
import org.apache.hadoop.hbase.regionserver.compactions.DateTieredCompactor;
|
import org.apache.hadoop.hbase.regionserver.compactions.DateTieredCompactor;
|
||||||
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
|
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
|
||||||
import org.apache.hadoop.hbase.security.User;
|
import org.apache.hadoop.hbase.security.User;
|
||||||
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* HBASE-15400 This store engine allows us to store data in date tiered layout with exponential
|
* HBASE-15400 This store engine allows us to store data in date tiered layout with exponential
|
||||||
|
|
|
@ -20,7 +20,6 @@ package org.apache.hadoop.hbase.regionserver;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.hbase.CellComparator;
|
import org.apache.hadoop.hbase.CellComparator;
|
||||||
|
@ -39,8 +38,8 @@ import org.apache.yetus.audience.InterfaceAudience;
|
||||||
* their derivatives.
|
* their derivatives.
|
||||||
*/
|
*/
|
||||||
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
|
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
|
||||||
public class DefaultStoreEngine extends StoreEngine<
|
public class DefaultStoreEngine extends StoreEngine<DefaultStoreFlusher,
|
||||||
DefaultStoreFlusher, RatioBasedCompactionPolicy, DefaultCompactor, DefaultStoreFileManager> {
|
RatioBasedCompactionPolicy, DefaultCompactor, DefaultStoreFileManager> {
|
||||||
|
|
||||||
public static final String DEFAULT_STORE_FLUSHER_CLASS_KEY =
|
public static final String DEFAULT_STORE_FLUSHER_CLASS_KEY =
|
||||||
"hbase.hstore.defaultengine.storeflusher.class";
|
"hbase.hstore.defaultengine.storeflusher.class";
|
||||||
|
|
|
@ -21,15 +21,14 @@ package org.apache.hadoop.hbase.regionserver;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
import org.apache.yetus.audience.InterfaceAudience;
|
|
||||||
import org.slf4j.Logger;
|
|
||||||
import org.slf4j.LoggerFactory;
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
|
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
|
||||||
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
|
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
|
||||||
import org.apache.hadoop.util.StringUtils;
|
import org.apache.hadoop.util.StringUtils;
|
||||||
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Default implementation of StoreFlusher.
|
* Default implementation of StoreFlusher.
|
||||||
|
@ -60,9 +59,7 @@ public class DefaultStoreFlusher extends StoreFlusher {
|
||||||
synchronized (flushLock) {
|
synchronized (flushLock) {
|
||||||
status.setStatus("Flushing " + store + ": creating writer");
|
status.setStatus("Flushing " + store + ": creating writer");
|
||||||
// Write the map out to the disk
|
// Write the map out to the disk
|
||||||
writer = store.createWriterInTmp(cellsCount,
|
writer = createWriter(snapshot, false);
|
||||||
store.getColumnFamilyDescriptor().getCompressionType(), false, true,
|
|
||||||
snapshot.isTagsPresent(), false);
|
|
||||||
IOException e = null;
|
IOException e = null;
|
||||||
try {
|
try {
|
||||||
performFlush(scanner, writer, throughputController);
|
performFlush(scanner, writer, throughputController);
|
||||||
|
|
|
@ -27,7 +27,6 @@ import java.util.NavigableSet;
|
||||||
import java.util.UUID;
|
import java.util.UUID;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
import java.util.concurrent.atomic.AtomicLong;
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
|
@ -158,7 +157,7 @@ public class HMobStore extends HStore {
|
||||||
protected StoreEngine<?, ?, ?, ?> createStoreEngine(HStore store, Configuration conf,
|
protected StoreEngine<?, ?, ?, ?> createStoreEngine(HStore store, Configuration conf,
|
||||||
CellComparator cellComparator) throws IOException {
|
CellComparator cellComparator) throws IOException {
|
||||||
MobStoreEngine engine = new MobStoreEngine();
|
MobStoreEngine engine = new MobStoreEngine();
|
||||||
engine.createComponents(conf, store, cellComparator);
|
engine.createComponentsOnce(conf, store, cellComparator);
|
||||||
return engine;
|
return engine;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -145,7 +145,7 @@ public class HRegionFileSystem {
|
||||||
// Temp Helpers
|
// Temp Helpers
|
||||||
// ===========================================================================
|
// ===========================================================================
|
||||||
/** @return {@link Path} to the region's temp directory, used for file creations */
|
/** @return {@link Path} to the region's temp directory, used for file creations */
|
||||||
Path getTempDir() {
|
public Path getTempDir() {
|
||||||
return new Path(getRegionDir(), REGION_TEMP_DIR);
|
return new Path(getRegionDir(), REGION_TEMP_DIR);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -240,11 +240,7 @@ public class HRegionFileSystem {
|
||||||
* @param familyName Column Family Name
|
* @param familyName Column Family Name
|
||||||
* @return a set of {@link StoreFileInfo} for the specified family.
|
* @return a set of {@link StoreFileInfo} for the specified family.
|
||||||
*/
|
*/
|
||||||
public Collection<StoreFileInfo> getStoreFiles(final byte[] familyName) throws IOException {
|
public List<StoreFileInfo> getStoreFiles(final String familyName) throws IOException {
|
||||||
return getStoreFiles(Bytes.toString(familyName));
|
|
||||||
}
|
|
||||||
|
|
||||||
public Collection<StoreFileInfo> getStoreFiles(final String familyName) throws IOException {
|
|
||||||
return getStoreFiles(familyName, true);
|
return getStoreFiles(familyName, true);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -254,7 +250,7 @@ public class HRegionFileSystem {
|
||||||
* @param familyName Column Family Name
|
* @param familyName Column Family Name
|
||||||
* @return a set of {@link StoreFileInfo} for the specified family.
|
* @return a set of {@link StoreFileInfo} for the specified family.
|
||||||
*/
|
*/
|
||||||
public Collection<StoreFileInfo> getStoreFiles(final String familyName, final boolean validate)
|
public List<StoreFileInfo> getStoreFiles(final String familyName, final boolean validate)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
Path familyDir = getStoreDir(familyName);
|
Path familyDir = getStoreDir(familyName);
|
||||||
FileStatus[] files = CommonFSUtils.listStatus(this.fs, familyDir);
|
FileStatus[] files = CommonFSUtils.listStatus(this.fs, familyDir);
|
||||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -23,6 +23,7 @@ import java.util.function.Supplier;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.hbase.CellComparator;
|
import org.apache.hadoop.hbase.CellComparator;
|
||||||
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
|
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
|
||||||
|
import org.apache.hadoop.hbase.client.RegionInfo;
|
||||||
import org.apache.hadoop.hbase.io.HeapSize;
|
import org.apache.hadoop.hbase.io.HeapSize;
|
||||||
import org.apache.hadoop.hbase.io.crypto.Encryption;
|
import org.apache.hadoop.hbase.io.crypto.Encryption;
|
||||||
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
|
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
|
||||||
|
@ -108,6 +109,14 @@ public final class StoreContext implements HeapSize {
|
||||||
return coprocessorHost;
|
return coprocessorHost;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public RegionInfo getRegionInfo() {
|
||||||
|
return regionFileSystem.getRegionInfo();
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean isPrimaryReplicaStore() {
|
||||||
|
return getRegionInfo().getReplicaId() == RegionInfo.DEFAULT_REPLICA_ID;
|
||||||
|
}
|
||||||
|
|
||||||
public static Builder getBuilder() {
|
public static Builder getBuilder() {
|
||||||
return new Builder();
|
return new Builder();
|
||||||
}
|
}
|
||||||
|
|
|
@ -19,38 +19,131 @@
|
||||||
|
|
||||||
package org.apache.hadoop.hbase.regionserver;
|
package org.apache.hadoop.hbase.regionserver;
|
||||||
|
|
||||||
|
import com.google.errorprone.annotations.RestrictedApi;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.io.InterruptedIOException;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Collection;
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.HashSet;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.Set;
|
||||||
|
import java.util.concurrent.CompletionService;
|
||||||
|
import java.util.concurrent.ExecutionException;
|
||||||
|
import java.util.concurrent.ExecutorCompletionService;
|
||||||
|
import java.util.concurrent.ExecutorService;
|
||||||
|
import java.util.concurrent.locks.ReadWriteLock;
|
||||||
|
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||||
|
import java.util.function.Function;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.hbase.CellComparator;
|
import org.apache.hadoop.hbase.CellComparator;
|
||||||
|
import org.apache.hadoop.hbase.log.HBaseMarkers;
|
||||||
import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
|
import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
|
||||||
import org.apache.hadoop.hbase.regionserver.compactions.CompactionPolicy;
|
import org.apache.hadoop.hbase.regionserver.compactions.CompactionPolicy;
|
||||||
import org.apache.hadoop.hbase.regionserver.compactions.Compactor;
|
import org.apache.hadoop.hbase.regionserver.compactions.Compactor;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTracker;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory;
|
||||||
import org.apache.hadoop.hbase.util.ReflectionUtils;
|
import org.apache.hadoop.hbase.util.ReflectionUtils;
|
||||||
import org.apache.yetus.audience.InterfaceAudience;
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
|
||||||
|
import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* StoreEngine is a factory that can create the objects necessary for HStore to operate.
|
* StoreEngine is a factory that can create the objects necessary for HStore to operate. Since not
|
||||||
* Since not all compaction policies, compactors and store file managers are compatible,
|
* all compaction policies, compactors and store file managers are compatible, they are tied
|
||||||
* they are tied together and replaced together via StoreEngine-s.
|
* together and replaced together via StoreEngine-s.
|
||||||
|
* <p/>
|
||||||
|
* We expose read write lock methods to upper layer for store operations:<br/>
|
||||||
|
* <ul>
|
||||||
|
* <li>Locked in shared mode when the list of component stores is looked at:
|
||||||
|
* <ul>
|
||||||
|
* <li>all reads/writes to table data</li>
|
||||||
|
* <li>checking for split</li>
|
||||||
|
* </ul>
|
||||||
|
* </li>
|
||||||
|
* <li>Locked in exclusive mode when the list of component stores is modified:
|
||||||
|
* <ul>
|
||||||
|
* <li>closing</li>
|
||||||
|
* <li>completing a compaction</li>
|
||||||
|
* </ul>
|
||||||
|
* </li>
|
||||||
|
* </ul>
|
||||||
|
* <p/>
|
||||||
|
* It is a bit confusing that we have a StoreFileManager(SFM) and then a StoreFileTracker(SFT). As
|
||||||
|
* its name says, SFT is used to track the store files list. The reason why we have a SFT beside SFM
|
||||||
|
* is that, when introducing stripe compaction, we introduced the StoreEngine and also the SFM, but
|
||||||
|
* actually, the SFM here is not a general 'Manager', it is only designed to manage the in memory
|
||||||
|
* 'stripes', so we can select different store files when scanning or compacting. The 'tracking' of
|
||||||
|
* store files is actually done in {@link org.apache.hadoop.hbase.regionserver.HRegionFileSystem}
|
||||||
|
* and {@link HStore} before we have SFT. And since SFM is designed to only holds in memory states,
|
||||||
|
* we will hold write lock when updating it, the lock is also used to protect the normal read/write
|
||||||
|
* requests. This means we'd better not add IO operations to SFM. And also, no matter what the in
|
||||||
|
* memory state is, stripe or not, it does not effect how we track the store files. So consider all
|
||||||
|
* these facts, here we introduce a separated SFT to track the store files.
|
||||||
|
* <p/>
|
||||||
|
* Here, since we always need to update SFM and SFT almost at the same time, we introduce methods in
|
||||||
|
* StoreEngine directly to update them both, so upper layer just need to update StoreEngine once, to
|
||||||
|
* reduce the possible misuse.
|
||||||
*/
|
*/
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
public abstract class StoreEngine<SF extends StoreFlusher,
|
public abstract class StoreEngine<SF extends StoreFlusher, CP extends CompactionPolicy,
|
||||||
CP extends CompactionPolicy, C extends Compactor, SFM extends StoreFileManager> {
|
C extends Compactor, SFM extends StoreFileManager> {
|
||||||
|
|
||||||
|
private static final Logger LOG = LoggerFactory.getLogger(StoreEngine.class);
|
||||||
|
|
||||||
protected SF storeFlusher;
|
protected SF storeFlusher;
|
||||||
protected CP compactionPolicy;
|
protected CP compactionPolicy;
|
||||||
protected C compactor;
|
protected C compactor;
|
||||||
protected SFM storeFileManager;
|
protected SFM storeFileManager;
|
||||||
|
private Configuration conf;
|
||||||
|
private StoreContext ctx;
|
||||||
|
private RegionCoprocessorHost coprocessorHost;
|
||||||
|
private Function<String, ExecutorService> openStoreFileThreadPoolCreator;
|
||||||
|
private StoreFileTracker storeFileTracker;
|
||||||
|
|
||||||
|
private final ReadWriteLock storeLock = new ReentrantReadWriteLock();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The name of the configuration parameter that specifies the class of
|
* The name of the configuration parameter that specifies the class of a store engine that is used
|
||||||
* a store engine that is used to manage and compact HBase store files.
|
* to manage and compact HBase store files.
|
||||||
*/
|
*/
|
||||||
public static final String STORE_ENGINE_CLASS_KEY = "hbase.hstore.engine.class";
|
public static final String STORE_ENGINE_CLASS_KEY = "hbase.hstore.engine.class";
|
||||||
|
|
||||||
private static final Class<? extends StoreEngine<?, ?, ?, ?>>
|
private static final Class<? extends StoreEngine<?, ?, ?, ?>> DEFAULT_STORE_ENGINE_CLASS =
|
||||||
DEFAULT_STORE_ENGINE_CLASS = DefaultStoreEngine.class;
|
DefaultStoreEngine.class;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Acquire read lock of this store.
|
||||||
|
*/
|
||||||
|
public void readLock() {
|
||||||
|
storeLock.readLock().lock();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Release read lock of this store.
|
||||||
|
*/
|
||||||
|
public void readUnlock() {
|
||||||
|
storeLock.readLock().unlock();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Acquire write lock of this store.
|
||||||
|
*/
|
||||||
|
public void writeLock() {
|
||||||
|
storeLock.writeLock().lock();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Release write lock of this store.
|
||||||
|
*/
|
||||||
|
public void writeUnlock() {
|
||||||
|
storeLock.writeLock().unlock();
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @return Compaction policy to use.
|
* @return Compaction policy to use.
|
||||||
|
@ -80,6 +173,11 @@ public abstract class StoreEngine<SF extends StoreFlusher,
|
||||||
return this.storeFlusher;
|
return this.storeFlusher;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private StoreFileTracker createStoreFileTracker(HStore store) {
|
||||||
|
return StoreFileTrackerFactory.create(store.conf, store.getRegionInfo().getTable(),
|
||||||
|
store.isPrimaryReplicaStore(), store.getStoreContext());
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @param filesCompacting Files currently compacting
|
* @param filesCompacting Files currently compacting
|
||||||
* @return whether a compaction selection is possible
|
* @return whether a compaction selection is possible
|
||||||
|
@ -87,8 +185,8 @@ public abstract class StoreEngine<SF extends StoreFlusher,
|
||||||
public abstract boolean needsCompaction(List<HStoreFile> filesCompacting);
|
public abstract boolean needsCompaction(List<HStoreFile> filesCompacting);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Creates an instance of a compaction context specific to this engine.
|
* Creates an instance of a compaction context specific to this engine. Doesn't actually select or
|
||||||
* Doesn't actually select or start a compaction. See CompactionContext class comment.
|
* start a compaction. See CompactionContext class comment.
|
||||||
* @return New CompactionContext object.
|
* @return New CompactionContext object.
|
||||||
*/
|
*/
|
||||||
public abstract CompactionContext createCompaction() throws IOException;
|
public abstract CompactionContext createCompaction() throws IOException;
|
||||||
|
@ -96,36 +194,347 @@ public abstract class StoreEngine<SF extends StoreFlusher,
|
||||||
/**
|
/**
|
||||||
* Create the StoreEngine's components.
|
* Create the StoreEngine's components.
|
||||||
*/
|
*/
|
||||||
protected abstract void createComponents(
|
protected abstract void createComponents(Configuration conf, HStore store,
|
||||||
Configuration conf, HStore store, CellComparator cellComparator) throws IOException;
|
CellComparator cellComparator) throws IOException;
|
||||||
|
|
||||||
private void createComponentsOnce(
|
protected final void createComponentsOnce(Configuration conf, HStore store,
|
||||||
Configuration conf, HStore store, CellComparator cellComparator) throws IOException {
|
CellComparator cellComparator) throws IOException {
|
||||||
assert compactor == null && compactionPolicy == null
|
assert compactor == null && compactionPolicy == null && storeFileManager == null &&
|
||||||
&& storeFileManager == null && storeFlusher == null;
|
storeFlusher == null && storeFileTracker == null;
|
||||||
createComponents(conf, store, cellComparator);
|
createComponents(conf, store, cellComparator);
|
||||||
assert compactor != null && compactionPolicy != null
|
this.conf = conf;
|
||||||
&& storeFileManager != null && storeFlusher != null;
|
this.ctx = store.getStoreContext();
|
||||||
|
this.coprocessorHost = store.getHRegion().getCoprocessorHost();
|
||||||
|
this.openStoreFileThreadPoolCreator = store.getHRegion()::getStoreFileOpenAndCloseThreadPool;
|
||||||
|
this.storeFileTracker = createStoreFileTracker(store);
|
||||||
|
assert compactor != null && compactionPolicy != null && storeFileManager != null &&
|
||||||
|
storeFlusher != null && storeFileTracker != null;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create a writer for writing new store files.
|
||||||
|
* @return Writer for a new StoreFile
|
||||||
|
*/
|
||||||
|
public StoreFileWriter createWriter(CreateStoreFileWriterParams params) throws IOException {
|
||||||
|
return storeFileTracker.createWriter(params);
|
||||||
|
}
|
||||||
|
|
||||||
|
public HStoreFile createStoreFileAndReader(Path p) throws IOException {
|
||||||
|
StoreFileInfo info = new StoreFileInfo(conf, ctx.getRegionFileSystem().getFileSystem(), p,
|
||||||
|
ctx.isPrimaryReplicaStore());
|
||||||
|
return createStoreFileAndReader(info);
|
||||||
|
}
|
||||||
|
|
||||||
|
public HStoreFile createStoreFileAndReader(StoreFileInfo info) throws IOException {
|
||||||
|
info.setRegionCoprocessorHost(coprocessorHost);
|
||||||
|
HStoreFile storeFile =
|
||||||
|
new HStoreFile(info, ctx.getFamily().getBloomFilterType(), ctx.getCacheConf());
|
||||||
|
storeFile.initReader();
|
||||||
|
return storeFile;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Validates a store file by opening and closing it. In HFileV2 this should not be an expensive
|
||||||
|
* operation.
|
||||||
|
* @param path the path to the store file
|
||||||
|
*/
|
||||||
|
public void validateStoreFile(Path path) throws IOException {
|
||||||
|
HStoreFile storeFile = null;
|
||||||
|
try {
|
||||||
|
storeFile = createStoreFileAndReader(path);
|
||||||
|
} catch (IOException e) {
|
||||||
|
LOG.error("Failed to open store file : {}, keeping it in tmp location", path, e);
|
||||||
|
throw e;
|
||||||
|
} finally {
|
||||||
|
if (storeFile != null) {
|
||||||
|
storeFile.closeStoreFile(false);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private List<HStoreFile> openStoreFiles(Collection<StoreFileInfo> files, boolean warmup)
|
||||||
|
throws IOException {
|
||||||
|
if (CollectionUtils.isEmpty(files)) {
|
||||||
|
return Collections.emptyList();
|
||||||
|
}
|
||||||
|
// initialize the thread pool for opening store files in parallel..
|
||||||
|
ExecutorService storeFileOpenerThreadPool =
|
||||||
|
openStoreFileThreadPoolCreator.apply("StoreFileOpener-" +
|
||||||
|
ctx.getRegionInfo().getEncodedName() + "-" + ctx.getFamily().getNameAsString());
|
||||||
|
CompletionService<HStoreFile> completionService =
|
||||||
|
new ExecutorCompletionService<>(storeFileOpenerThreadPool);
|
||||||
|
|
||||||
|
int totalValidStoreFile = 0;
|
||||||
|
for (StoreFileInfo storeFileInfo : files) {
|
||||||
|
// The StoreFileInfo will carry store configuration down to HFile, we need to set it to
|
||||||
|
// our store's CompoundConfiguration here.
|
||||||
|
storeFileInfo.setConf(conf);
|
||||||
|
// open each store file in parallel
|
||||||
|
completionService.submit(() -> createStoreFileAndReader(storeFileInfo));
|
||||||
|
totalValidStoreFile++;
|
||||||
|
}
|
||||||
|
|
||||||
|
Set<String> compactedStoreFiles = new HashSet<>();
|
||||||
|
ArrayList<HStoreFile> results = new ArrayList<>(files.size());
|
||||||
|
IOException ioe = null;
|
||||||
|
try {
|
||||||
|
for (int i = 0; i < totalValidStoreFile; i++) {
|
||||||
|
try {
|
||||||
|
HStoreFile storeFile = completionService.take().get();
|
||||||
|
if (storeFile != null) {
|
||||||
|
LOG.debug("loaded {}", storeFile);
|
||||||
|
results.add(storeFile);
|
||||||
|
compactedStoreFiles.addAll(storeFile.getCompactedStoreFiles());
|
||||||
|
}
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
if (ioe == null) {
|
||||||
|
ioe = new InterruptedIOException(e.getMessage());
|
||||||
|
}
|
||||||
|
} catch (ExecutionException e) {
|
||||||
|
if (ioe == null) {
|
||||||
|
ioe = new IOException(e.getCause());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
storeFileOpenerThreadPool.shutdownNow();
|
||||||
|
}
|
||||||
|
if (ioe != null) {
|
||||||
|
// close StoreFile readers
|
||||||
|
boolean evictOnClose =
|
||||||
|
ctx.getCacheConf() != null ? ctx.getCacheConf().shouldEvictOnClose() : true;
|
||||||
|
for (HStoreFile file : results) {
|
||||||
|
try {
|
||||||
|
if (file != null) {
|
||||||
|
file.closeStoreFile(evictOnClose);
|
||||||
|
}
|
||||||
|
} catch (IOException e) {
|
||||||
|
LOG.warn("Could not close store file {}", file, e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
throw ioe;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Should not archive the compacted store files when region warmup. See HBASE-22163.
|
||||||
|
if (!warmup) {
|
||||||
|
// Remove the compacted files from result
|
||||||
|
List<HStoreFile> filesToRemove = new ArrayList<>(compactedStoreFiles.size());
|
||||||
|
for (HStoreFile storeFile : results) {
|
||||||
|
if (compactedStoreFiles.contains(storeFile.getPath().getName())) {
|
||||||
|
LOG.warn("Clearing the compacted storefile {} from {}", storeFile, this);
|
||||||
|
storeFile.getReader().close(
|
||||||
|
storeFile.getCacheConf() != null ? storeFile.getCacheConf().shouldEvictOnClose() :
|
||||||
|
true);
|
||||||
|
filesToRemove.add(storeFile);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
results.removeAll(filesToRemove);
|
||||||
|
if (!filesToRemove.isEmpty() && ctx.isPrimaryReplicaStore()) {
|
||||||
|
LOG.debug("Moving the files {} to archive", filesToRemove);
|
||||||
|
ctx.getRegionFileSystem().removeStoreFiles(ctx.getFamily().getNameAsString(),
|
||||||
|
filesToRemove);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return results;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void initialize(boolean warmup) throws IOException {
|
||||||
|
List<StoreFileInfo> fileInfos = storeFileTracker.load();
|
||||||
|
List<HStoreFile> files = openStoreFiles(fileInfos, warmup);
|
||||||
|
storeFileManager.loadFiles(files);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void refreshStoreFiles() throws IOException {
|
||||||
|
List<StoreFileInfo> fileInfos = storeFileTracker.load();
|
||||||
|
refreshStoreFilesInternal(fileInfos);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void refreshStoreFiles(Collection<String> newFiles) throws IOException {
|
||||||
|
List<StoreFileInfo> storeFiles = new ArrayList<>(newFiles.size());
|
||||||
|
for (String file : newFiles) {
|
||||||
|
storeFiles
|
||||||
|
.add(ctx.getRegionFileSystem().getStoreFileInfo(ctx.getFamily().getNameAsString(), file));
|
||||||
|
}
|
||||||
|
refreshStoreFilesInternal(storeFiles);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Checks the underlying store files, and opens the files that have not been opened, and removes
|
||||||
|
* the store file readers for store files no longer available. Mainly used by secondary region
|
||||||
|
* replicas to keep up to date with the primary region files.
|
||||||
|
*/
|
||||||
|
private void refreshStoreFilesInternal(Collection<StoreFileInfo> newFiles) throws IOException {
|
||||||
|
Collection<HStoreFile> currentFiles = storeFileManager.getStorefiles();
|
||||||
|
Collection<HStoreFile> compactedFiles = storeFileManager.getCompactedfiles();
|
||||||
|
if (currentFiles == null) {
|
||||||
|
currentFiles = Collections.emptySet();
|
||||||
|
}
|
||||||
|
if (newFiles == null) {
|
||||||
|
newFiles = Collections.emptySet();
|
||||||
|
}
|
||||||
|
if (compactedFiles == null) {
|
||||||
|
compactedFiles = Collections.emptySet();
|
||||||
|
}
|
||||||
|
|
||||||
|
HashMap<StoreFileInfo, HStoreFile> currentFilesSet = new HashMap<>(currentFiles.size());
|
||||||
|
for (HStoreFile sf : currentFiles) {
|
||||||
|
currentFilesSet.put(sf.getFileInfo(), sf);
|
||||||
|
}
|
||||||
|
HashMap<StoreFileInfo, HStoreFile> compactedFilesSet = new HashMap<>(compactedFiles.size());
|
||||||
|
for (HStoreFile sf : compactedFiles) {
|
||||||
|
compactedFilesSet.put(sf.getFileInfo(), sf);
|
||||||
|
}
|
||||||
|
|
||||||
|
Set<StoreFileInfo> newFilesSet = new HashSet<StoreFileInfo>(newFiles);
|
||||||
|
// Exclude the files that have already been compacted
|
||||||
|
newFilesSet = Sets.difference(newFilesSet, compactedFilesSet.keySet());
|
||||||
|
Set<StoreFileInfo> toBeAddedFiles = Sets.difference(newFilesSet, currentFilesSet.keySet());
|
||||||
|
Set<StoreFileInfo> toBeRemovedFiles = Sets.difference(currentFilesSet.keySet(), newFilesSet);
|
||||||
|
|
||||||
|
if (toBeAddedFiles.isEmpty() && toBeRemovedFiles.isEmpty()) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
LOG.info("Refreshing store files for " + this + " files to add: " + toBeAddedFiles +
|
||||||
|
" files to remove: " + toBeRemovedFiles);
|
||||||
|
|
||||||
|
Set<HStoreFile> toBeRemovedStoreFiles = new HashSet<>(toBeRemovedFiles.size());
|
||||||
|
for (StoreFileInfo sfi : toBeRemovedFiles) {
|
||||||
|
toBeRemovedStoreFiles.add(currentFilesSet.get(sfi));
|
||||||
|
}
|
||||||
|
|
||||||
|
// try to open the files
|
||||||
|
List<HStoreFile> openedFiles = openStoreFiles(toBeAddedFiles, false);
|
||||||
|
|
||||||
|
// propogate the file changes to the underlying store file manager
|
||||||
|
replaceStoreFiles(toBeRemovedStoreFiles, openedFiles); // won't throw an exception
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Commit the given {@code files}.
|
||||||
|
* <p/>
|
||||||
|
* We will move the file into data directory, and open it.
|
||||||
|
* @param files the files want to commit
|
||||||
|
* @param validate whether to validate the store files
|
||||||
|
* @return the committed store files
|
||||||
|
*/
|
||||||
|
public List<HStoreFile> commitStoreFiles(List<Path> files, boolean validate) throws IOException {
|
||||||
|
List<HStoreFile> committedFiles = new ArrayList<>(files.size());
|
||||||
|
HRegionFileSystem hfs = ctx.getRegionFileSystem();
|
||||||
|
String familyName = ctx.getFamily().getNameAsString();
|
||||||
|
Path storeDir = hfs.getStoreDir(familyName);
|
||||||
|
for (Path file : files) {
|
||||||
|
try {
|
||||||
|
if (validate) {
|
||||||
|
validateStoreFile(file);
|
||||||
|
}
|
||||||
|
Path committedPath;
|
||||||
|
// As we want to support writing to data directory directly, here we need to check whether
|
||||||
|
// the store file is already in the right place
|
||||||
|
if (file.getParent() != null && file.getParent().equals(storeDir)) {
|
||||||
|
// already in the right place, skip renmaing
|
||||||
|
committedPath = file;
|
||||||
|
} else {
|
||||||
|
// Write-out finished successfully, move into the right spot
|
||||||
|
committedPath = hfs.commitStoreFile(familyName, file);
|
||||||
|
}
|
||||||
|
HStoreFile sf = createStoreFileAndReader(committedPath);
|
||||||
|
committedFiles.add(sf);
|
||||||
|
} catch (IOException e) {
|
||||||
|
LOG.error("Failed to commit store file {}", file, e);
|
||||||
|
// Try to delete the files we have committed before.
|
||||||
|
// It is OK to fail when deleting as leaving the file there does not cause any data
|
||||||
|
// corruption problem. It just introduces some duplicated data which may impact read
|
||||||
|
// performance a little when reading before compaction.
|
||||||
|
for (HStoreFile sf : committedFiles) {
|
||||||
|
Path pathToDelete = sf.getPath();
|
||||||
|
try {
|
||||||
|
sf.deleteStoreFile();
|
||||||
|
} catch (IOException deleteEx) {
|
||||||
|
LOG.warn(HBaseMarkers.FATAL, "Failed to delete committed store file {}", pathToDelete,
|
||||||
|
deleteEx);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
throw new IOException("Failed to commit the flush", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return committedFiles;
|
||||||
|
}
|
||||||
|
|
||||||
|
@FunctionalInterface
|
||||||
|
public interface IOExceptionRunnable {
|
||||||
|
void run() throws IOException;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Add the store files to store file manager, and also record it in the store file tracker.
|
||||||
|
* <p/>
|
||||||
|
* The {@code actionAfterAdding} will be executed after the insertion to store file manager, under
|
||||||
|
* the lock protection. Usually this is for clear the memstore snapshot.
|
||||||
|
*/
|
||||||
|
public void addStoreFiles(Collection<HStoreFile> storeFiles,
|
||||||
|
IOExceptionRunnable actionAfterAdding) throws IOException {
|
||||||
|
storeFileTracker.add(StoreUtils.toStoreFileInfo(storeFiles));
|
||||||
|
writeLock();
|
||||||
|
try {
|
||||||
|
storeFileManager.insertNewFiles(storeFiles);
|
||||||
|
actionAfterAdding.run();
|
||||||
|
} finally {
|
||||||
|
// We need the lock, as long as we are updating the storeFiles
|
||||||
|
// or changing the memstore. Let us release it before calling
|
||||||
|
// notifyChangeReadersObservers. See HBASE-4485 for a possible
|
||||||
|
// deadlock scenario that could have happened if continue to hold
|
||||||
|
// the lock.
|
||||||
|
writeUnlock();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void replaceStoreFiles(Collection<HStoreFile> compactedFiles,
|
||||||
|
Collection<HStoreFile> newFiles) throws IOException {
|
||||||
|
storeFileTracker.replace(StoreUtils.toStoreFileInfo(compactedFiles),
|
||||||
|
StoreUtils.toStoreFileInfo(newFiles));
|
||||||
|
writeLock();
|
||||||
|
try {
|
||||||
|
storeFileManager.addCompactionResults(compactedFiles, newFiles);
|
||||||
|
} finally {
|
||||||
|
writeUnlock();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void removeCompactedFiles(Collection<HStoreFile> compactedFiles) {
|
||||||
|
writeLock();
|
||||||
|
try {
|
||||||
|
storeFileManager.removeCompactedFiles(compactedFiles);
|
||||||
|
} finally {
|
||||||
|
writeUnlock();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Create the StoreEngine configured for the given Store.
|
* Create the StoreEngine configured for the given Store.
|
||||||
* @param store The store. An unfortunate dependency needed due to it
|
* @param store The store. An unfortunate dependency needed due to it being passed to coprocessors
|
||||||
* being passed to coprocessors via the compactor.
|
* via the compactor.
|
||||||
* @param conf Store configuration.
|
* @param conf Store configuration.
|
||||||
* @param cellComparator CellComparator for storeFileManager.
|
* @param cellComparator CellComparator for storeFileManager.
|
||||||
* @return StoreEngine to use.
|
* @return StoreEngine to use.
|
||||||
*/
|
*/
|
||||||
public static StoreEngine<?, ?, ?, ?> create(
|
public static StoreEngine<?, ?, ?, ?> create(HStore store, Configuration conf,
|
||||||
HStore store, Configuration conf, CellComparator cellComparator) throws IOException {
|
CellComparator cellComparator) throws IOException {
|
||||||
String className = conf.get(STORE_ENGINE_CLASS_KEY, DEFAULT_STORE_ENGINE_CLASS.getName());
|
String className = conf.get(STORE_ENGINE_CLASS_KEY, DEFAULT_STORE_ENGINE_CLASS.getName());
|
||||||
try {
|
try {
|
||||||
StoreEngine<?,?,?,?> se = ReflectionUtils.instantiateWithCustomCtor(
|
StoreEngine<?, ?, ?, ?> se =
|
||||||
className, new Class[] { }, new Object[] { });
|
ReflectionUtils.instantiateWithCustomCtor(className, new Class[] {}, new Object[] {});
|
||||||
se.createComponentsOnce(conf, store, cellComparator);
|
se.createComponentsOnce(conf, store, cellComparator);
|
||||||
return se;
|
return se;
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
throw new IOException("Unable to load configured store engine '" + className + "'", e);
|
throw new IOException("Unable to load configured store engine '" + className + "'", e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@RestrictedApi(explanation = "Should only be called in TestHStore", link = "",
|
||||||
|
allowedOnPath = ".*/TestHStore.java")
|
||||||
|
ReadWriteLock getLock() {
|
||||||
|
return storeLock;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -18,6 +18,7 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hbase.regionserver;
|
package org.apache.hadoop.hbase.regionserver;
|
||||||
|
|
||||||
|
import com.google.errorprone.annotations.RestrictedApi;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.Comparator;
|
import java.util.Comparator;
|
||||||
|
@ -49,12 +50,16 @@ public interface StoreFileManager {
|
||||||
* Loads the initial store files into empty StoreFileManager.
|
* Loads the initial store files into empty StoreFileManager.
|
||||||
* @param storeFiles The files to load.
|
* @param storeFiles The files to load.
|
||||||
*/
|
*/
|
||||||
|
@RestrictedApi(explanation = "Should only be called in StoreEngine", link = "",
|
||||||
|
allowedOnPath = ".*(/org/apache/hadoop/hbase/regionserver/StoreEngine.java|/src/test/.*)")
|
||||||
void loadFiles(List<HStoreFile> storeFiles);
|
void loadFiles(List<HStoreFile> storeFiles);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Adds new files, either for from MemStore flush or bulk insert, into the structure.
|
* Adds new files, either for from MemStore flush or bulk insert, into the structure.
|
||||||
* @param sfs New store files.
|
* @param sfs New store files.
|
||||||
*/
|
*/
|
||||||
|
@RestrictedApi(explanation = "Should only be called in StoreEngine", link = "",
|
||||||
|
allowedOnPath = ".*(/org/apache/hadoop/hbase/regionserver/StoreEngine.java|/src/test/.*)")
|
||||||
void insertNewFiles(Collection<HStoreFile> sfs);
|
void insertNewFiles(Collection<HStoreFile> sfs);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -62,12 +67,16 @@ public interface StoreFileManager {
|
||||||
* @param compactedFiles The input files for the compaction.
|
* @param compactedFiles The input files for the compaction.
|
||||||
* @param results The resulting files for the compaction.
|
* @param results The resulting files for the compaction.
|
||||||
*/
|
*/
|
||||||
|
@RestrictedApi(explanation = "Should only be called in StoreEngine", link = "",
|
||||||
|
allowedOnPath = ".*(/org/apache/hadoop/hbase/regionserver/StoreEngine.java|/src/test/.*)")
|
||||||
void addCompactionResults(Collection<HStoreFile> compactedFiles, Collection<HStoreFile> results);
|
void addCompactionResults(Collection<HStoreFile> compactedFiles, Collection<HStoreFile> results);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Remove the compacted files
|
* Remove the compacted files
|
||||||
* @param compactedFiles the list of compacted files
|
* @param compactedFiles the list of compacted files
|
||||||
*/
|
*/
|
||||||
|
@RestrictedApi(explanation = "Should only be called in StoreEngine", link = "",
|
||||||
|
allowedOnPath = ".*(/org/apache/hadoop/hbase/regionserver/StoreEngine.java|/src/test/.*)")
|
||||||
void removeCompactedFiles(Collection<HStoreFile> compactedFiles);
|
void removeCompactedFiles(Collection<HStoreFile> compactedFiles);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -69,10 +69,17 @@ abstract class StoreFlusher {
|
||||||
writer.close();
|
writer.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
protected final StoreFileWriter createWriter(MemStoreSnapshot snapshot, boolean alwaysIncludesTag)
|
||||||
|
throws IOException {
|
||||||
|
return store.getStoreEngine()
|
||||||
|
.createWriter(CreateStoreFileWriterParams.create().maxKeyCount(snapshot.getCellsCount())
|
||||||
|
.compression(store.getColumnFamilyDescriptor().getCompressionType()).isCompaction(false)
|
||||||
|
.includeMVCCReadpoint(true).includesTag(alwaysIncludesTag || snapshot.isTagsPresent())
|
||||||
|
.shouldDropBehind(false));
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Creates the scanner for flushing snapshot. Also calls coprocessors.
|
* Creates the scanner for flushing snapshot. Also calls coprocessors.
|
||||||
* @param snapshotScanners
|
|
||||||
* @return The scanner; null if coprocessor is canceling the flush.
|
* @return The scanner; null if coprocessor is canceling the flush.
|
||||||
*/
|
*/
|
||||||
protected final InternalScanner createScanner(List<KeyValueScanner> snapshotScanners,
|
protected final InternalScanner createScanner(List<KeyValueScanner> snapshotScanners,
|
||||||
|
|
|
@ -20,10 +20,13 @@ package org.apache.hadoop.hbase.regionserver;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
|
import java.util.List;
|
||||||
import java.util.Optional;
|
import java.util.Optional;
|
||||||
import java.util.OptionalInt;
|
import java.util.OptionalInt;
|
||||||
import java.util.OptionalLong;
|
import java.util.OptionalLong;
|
||||||
|
import java.util.function.Predicate;
|
||||||
|
import java.util.function.ToLongFunction;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.hbase.Cell;
|
import org.apache.hadoop.hbase.Cell;
|
||||||
import org.apache.hadoop.hbase.CellComparator;
|
import org.apache.hadoop.hbase.CellComparator;
|
||||||
|
@ -42,10 +45,13 @@ import org.slf4j.LoggerFactory;
|
||||||
* Utility functions for region server storage layer.
|
* Utility functions for region server storage layer.
|
||||||
*/
|
*/
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
public class StoreUtils {
|
public final class StoreUtils {
|
||||||
|
|
||||||
private static final Logger LOG = LoggerFactory.getLogger(StoreUtils.class);
|
private static final Logger LOG = LoggerFactory.getLogger(StoreUtils.class);
|
||||||
|
|
||||||
|
private StoreUtils() {
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Creates a deterministic hash code for store file collection.
|
* Creates a deterministic hash code for store file collection.
|
||||||
*/
|
*/
|
||||||
|
@ -171,4 +177,31 @@ public class StoreUtils {
|
||||||
return new CompoundConfiguration().add(conf).addBytesMap(td.getValues())
|
return new CompoundConfiguration().add(conf).addBytesMap(td.getValues())
|
||||||
.addStringMap(cfd.getConfiguration()).addBytesMap(cfd.getValues());
|
.addStringMap(cfd.getConfiguration()).addBytesMap(cfd.getValues());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static List<StoreFileInfo> toStoreFileInfo(Collection<HStoreFile> storefiles) {
|
||||||
|
return storefiles.stream().map(HStoreFile::getFileInfo).collect(Collectors.toList());
|
||||||
|
}
|
||||||
|
|
||||||
|
public static long getTotalUncompressedBytes(List<HStoreFile> files) {
|
||||||
|
return files.stream()
|
||||||
|
.mapToLong(file -> getStorefileFieldSize(file, StoreFileReader::getTotalUncompressedBytes))
|
||||||
|
.sum();
|
||||||
|
}
|
||||||
|
|
||||||
|
public static long getStorefilesSize(Collection<HStoreFile> files,
|
||||||
|
Predicate<HStoreFile> predicate) {
|
||||||
|
return files.stream().filter(predicate)
|
||||||
|
.mapToLong(file -> getStorefileFieldSize(file, StoreFileReader::length)).sum();
|
||||||
|
}
|
||||||
|
|
||||||
|
public static long getStorefileFieldSize(HStoreFile file, ToLongFunction<StoreFileReader> f) {
|
||||||
|
if (file == null) {
|
||||||
|
return 0L;
|
||||||
|
}
|
||||||
|
StoreFileReader reader = file.getReader();
|
||||||
|
if (reader == null) {
|
||||||
|
return 0L;
|
||||||
|
}
|
||||||
|
return f.applyAsLong(reader);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -20,20 +20,19 @@ package org.apache.hadoop.hbase.regionserver;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.hbase.CellComparator;
|
import org.apache.hadoop.hbase.CellComparator;
|
||||||
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
|
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
|
||||||
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequestImpl;
|
|
||||||
import org.apache.yetus.audience.InterfaceAudience;
|
|
||||||
import org.slf4j.Logger;
|
|
||||||
import org.slf4j.LoggerFactory;
|
|
||||||
import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
|
import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequestImpl;
|
||||||
import org.apache.hadoop.hbase.regionserver.compactions.StripeCompactionPolicy;
|
import org.apache.hadoop.hbase.regionserver.compactions.StripeCompactionPolicy;
|
||||||
import org.apache.hadoop.hbase.regionserver.compactions.StripeCompactor;
|
import org.apache.hadoop.hbase.regionserver.compactions.StripeCompactor;
|
||||||
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
|
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
|
||||||
import org.apache.hadoop.hbase.security.User;
|
import org.apache.hadoop.hbase.security.User;
|
||||||
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
|
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
|
||||||
|
|
||||||
|
|
|
@ -70,7 +70,7 @@ public class StripeStoreFlusher extends StoreFlusher {
|
||||||
StripeMultiFileWriter mw = null;
|
StripeMultiFileWriter mw = null;
|
||||||
try {
|
try {
|
||||||
mw = req.createWriter(); // Writer according to the policy.
|
mw = req.createWriter(); // Writer according to the policy.
|
||||||
StripeMultiFileWriter.WriterFactory factory = createWriterFactory(cellsCount);
|
StripeMultiFileWriter.WriterFactory factory = createWriterFactory(snapshot);
|
||||||
StoreScanner storeScanner = (scanner instanceof StoreScanner) ? (StoreScanner)scanner : null;
|
StoreScanner storeScanner = (scanner instanceof StoreScanner) ? (StoreScanner)scanner : null;
|
||||||
mw.init(storeScanner, factory);
|
mw.init(storeScanner, factory);
|
||||||
|
|
||||||
|
@ -98,13 +98,12 @@ public class StripeStoreFlusher extends StoreFlusher {
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
|
||||||
private StripeMultiFileWriter.WriterFactory createWriterFactory(final long kvCount) {
|
private StripeMultiFileWriter.WriterFactory createWriterFactory(MemStoreSnapshot snapshot) {
|
||||||
return new StripeMultiFileWriter.WriterFactory() {
|
return new StripeMultiFileWriter.WriterFactory() {
|
||||||
@Override
|
@Override
|
||||||
public StoreFileWriter createWriter() throws IOException {
|
public StoreFileWriter createWriter() throws IOException {
|
||||||
StoreFileWriter writer = store.createWriterInTmp(kvCount,
|
// XXX: it used to always pass true for includesTag, re-consider?
|
||||||
store.getColumnFamilyDescriptor().getCompressionType(), false, true, true, false);
|
return StripeStoreFlusher.this.createWriter(snapshot, true);
|
||||||
return writer;
|
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
|
@ -51,13 +51,14 @@ public abstract class AbstractMultiOutputCompactor<T extends AbstractMultiFileWr
|
||||||
WriterFactory writerFactory = new WriterFactory() {
|
WriterFactory writerFactory = new WriterFactory() {
|
||||||
@Override
|
@Override
|
||||||
public StoreFileWriter createWriter() throws IOException {
|
public StoreFileWriter createWriter() throws IOException {
|
||||||
return createTmpWriter(fd, shouldDropBehind, major);
|
return AbstractMultiOutputCompactor.this.createWriter(fd, shouldDropBehind, major);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public StoreFileWriter createWriterWithStoragePolicy(String fileStoragePolicy)
|
public StoreFileWriter createWriterWithStoragePolicy(String fileStoragePolicy)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
return createTmpWriter(fd, shouldDropBehind, fileStoragePolicy, major);
|
return AbstractMultiOutputCompactor.this.createWriter(fd, shouldDropBehind,
|
||||||
|
fileStoragePolicy, major);
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
// Prepare multi-writer, and perform the compaction using scanner and writer.
|
// Prepare multi-writer, and perform the compaction using scanner and writer.
|
||||||
|
|
|
@ -28,7 +28,6 @@ import java.util.ArrayList;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.hbase.Cell;
|
import org.apache.hadoop.hbase.Cell;
|
||||||
|
@ -38,6 +37,7 @@ import org.apache.hadoop.hbase.io.compress.Compression;
|
||||||
import org.apache.hadoop.hbase.io.hfile.HFile;
|
import org.apache.hadoop.hbase.io.hfile.HFile;
|
||||||
import org.apache.hadoop.hbase.io.hfile.HFileInfo;
|
import org.apache.hadoop.hbase.io.hfile.HFileInfo;
|
||||||
import org.apache.hadoop.hbase.regionserver.CellSink;
|
import org.apache.hadoop.hbase.regionserver.CellSink;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.CreateStoreFileWriterParams;
|
||||||
import org.apache.hadoop.hbase.regionserver.HStore;
|
import org.apache.hadoop.hbase.regionserver.HStore;
|
||||||
import org.apache.hadoop.hbase.regionserver.HStoreFile;
|
import org.apache.hadoop.hbase.regionserver.HStoreFile;
|
||||||
import org.apache.hadoop.hbase.regionserver.InternalScanner;
|
import org.apache.hadoop.hbase.regionserver.InternalScanner;
|
||||||
|
@ -60,6 +60,7 @@ import org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix;
|
||||||
import org.apache.yetus.audience.InterfaceAudience;
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
|
import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -260,29 +261,32 @@ public abstract class Compactor<T extends CellSink> {
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
/**
|
protected final CreateStoreFileWriterParams createParams(FileDetails fd, boolean shouldDropBehind,
|
||||||
* Creates a writer for a new file in a temporary directory.
|
boolean major) {
|
||||||
* @param fd The file details.
|
return CreateStoreFileWriterParams.create().maxKeyCount(fd.maxKeyCount)
|
||||||
* @return Writer for a new StoreFile in the tmp dir.
|
.compression(major ? majorCompactionCompression : minorCompactionCompression)
|
||||||
* @throws IOException if creation failed
|
.isCompaction(true).includeMVCCReadpoint(fd.maxMVCCReadpoint > 0)
|
||||||
*/
|
.includesTag(fd.maxTagsLength > 0).shouldDropBehind(shouldDropBehind)
|
||||||
protected final StoreFileWriter createTmpWriter(FileDetails fd, boolean shouldDropBehind, boolean major)
|
.totalCompactedFilesSize(fd.totalCompactedFilesSize);
|
||||||
throws IOException {
|
|
||||||
// When all MVCC readpoints are 0, don't write them.
|
|
||||||
// See HBASE-8166, HBASE-12600, and HBASE-13389.
|
|
||||||
return store.createWriterInTmp(fd.maxKeyCount,
|
|
||||||
major ? majorCompactionCompression : minorCompactionCompression,
|
|
||||||
true, fd.maxMVCCReadpoint > 0,
|
|
||||||
fd.maxTagsLength > 0, shouldDropBehind, fd.totalCompactedFilesSize,
|
|
||||||
HConstants.EMPTY_STRING);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
protected final StoreFileWriter createTmpWriter(FileDetails fd, boolean shouldDropBehind,
|
/**
|
||||||
String fileStoragePolicy, boolean major) throws IOException {
|
* Creates a writer for a new file.
|
||||||
return store.createWriterInTmp(fd.maxKeyCount,
|
* @param fd The file details.
|
||||||
major ? majorCompactionCompression : minorCompactionCompression,
|
* @return Writer for a new StoreFile
|
||||||
true, fd.maxMVCCReadpoint > 0,
|
* @throws IOException if creation failed
|
||||||
fd.maxTagsLength > 0, shouldDropBehind, fd.totalCompactedFilesSize, fileStoragePolicy);
|
*/
|
||||||
|
protected final StoreFileWriter createWriter(FileDetails fd, boolean shouldDropBehind,
|
||||||
|
boolean major) throws IOException {
|
||||||
|
// When all MVCC readpoints are 0, don't write them.
|
||||||
|
// See HBASE-8166, HBASE-12600, and HBASE-13389.
|
||||||
|
return store.getStoreEngine().createWriter(createParams(fd, shouldDropBehind, major));
|
||||||
|
}
|
||||||
|
|
||||||
|
protected final StoreFileWriter createWriter(FileDetails fd, boolean shouldDropBehind,
|
||||||
|
String fileStoragePolicy, boolean major) throws IOException {
|
||||||
|
return store.getStoreEngine()
|
||||||
|
.createWriter(createParams(fd, shouldDropBehind, major).fileStoragePolicy(fileStoragePolicy));
|
||||||
}
|
}
|
||||||
|
|
||||||
private ScanInfo preCompactScannerOpen(CompactionRequestImpl request, ScanType scanType,
|
private ScanInfo preCompactScannerOpen(CompactionRequestImpl request, ScanType scanType,
|
||||||
|
|
|
@ -45,14 +45,14 @@ public class DefaultCompactor extends Compactor<StoreFileWriter> {
|
||||||
}
|
}
|
||||||
|
|
||||||
private final CellSinkFactory<StoreFileWriter> writerFactory =
|
private final CellSinkFactory<StoreFileWriter> writerFactory =
|
||||||
new CellSinkFactory<StoreFileWriter>() {
|
new CellSinkFactory<StoreFileWriter>() {
|
||||||
@Override
|
@Override
|
||||||
public StoreFileWriter createWriter(InternalScanner scanner,
|
public StoreFileWriter createWriter(InternalScanner scanner,
|
||||||
org.apache.hadoop.hbase.regionserver.compactions.Compactor.FileDetails fd,
|
org.apache.hadoop.hbase.regionserver.compactions.Compactor.FileDetails fd,
|
||||||
boolean shouldDropBehind, boolean major) throws IOException {
|
boolean shouldDropBehind, boolean major) throws IOException {
|
||||||
return createTmpWriter(fd, shouldDropBehind, major);
|
return DefaultCompactor.this.createWriter(fd, shouldDropBehind, major);
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Do a minor/major compaction on an explicit set of storefiles from a Store.
|
* Do a minor/major compaction on an explicit set of storefiles from a Store.
|
||||||
|
|
|
@ -0,0 +1,61 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.regionserver.storefiletracker;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.Collection;
|
||||||
|
import java.util.List;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.hbase.TableName;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.StoreContext;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
|
||||||
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The default implementation for store file tracker, where we do not persist the store file list,
|
||||||
|
* and use listing when loading store files.
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
class DefaultStoreFileTracker extends StoreFileTrackerBase {
|
||||||
|
|
||||||
|
public DefaultStoreFileTracker(Configuration conf, TableName tableName, boolean isPrimaryReplica,
|
||||||
|
StoreContext ctx) {
|
||||||
|
super(conf, tableName, isPrimaryReplica, ctx);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public List<StoreFileInfo> load() throws IOException {
|
||||||
|
return ctx.getRegionFileSystem().getStoreFiles(ctx.getFamily().getNameAsString());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean requireWritingToTmpDirFirst() {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void doAddNewStoreFiles(Collection<StoreFileInfo> newFiles) throws IOException {
|
||||||
|
// NOOP
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void doAddCompactionResults(Collection<StoreFileInfo> compactedFiles,
|
||||||
|
Collection<StoreFileInfo> newFiles) throws IOException {
|
||||||
|
// NOOP
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,75 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.regionserver.storefiletracker;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.Collection;
|
||||||
|
import java.util.List;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.CreateStoreFileWriterParams;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
|
||||||
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* An interface to define how we track the store files for a give store.
|
||||||
|
* <p/>
|
||||||
|
* In the old time, we will write store to a tmp directory first, and then rename it to the actual
|
||||||
|
* data file. And once a store file is under data directory, we will consider it as 'committed'. And
|
||||||
|
* we need to do listing when loading store files.
|
||||||
|
* <p/>
|
||||||
|
* When cloud age is coming, now we want to store the store files on object storage, where rename
|
||||||
|
* and list are not as cheap as on HDFS, especially rename. Although introducing a metadata
|
||||||
|
* management layer for object storage could solve the problem, but we still want HBase to run on
|
||||||
|
* pure object storage, so here we introduce this interface to abstract how we track the store
|
||||||
|
* files. For the old implementation, we just persist nothing here, and do listing to load store
|
||||||
|
* files. When running on object storage, we could persist the store file list in a system region,
|
||||||
|
* or in a file on the object storage, to make it possible to write directly into the data directory
|
||||||
|
* to avoid renaming, and also avoid listing when loading store files.
|
||||||
|
* <p/>
|
||||||
|
* The implementation requires to be thread safe as flush and compaction may occur as the same time,
|
||||||
|
* and we could also do multiple compactions at the same time. As the implementation may choose to
|
||||||
|
* persist the store file list to external storage, which could be slow, it is the duty for the
|
||||||
|
* callers to not call it inside a lock which may block normal read/write requests.
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
public interface StoreFileTracker {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Load the store files list when opening a region.
|
||||||
|
*/
|
||||||
|
List<StoreFileInfo> load() throws IOException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Add new store files.
|
||||||
|
* <p/>
|
||||||
|
* Used for flush and bulk load.
|
||||||
|
*/
|
||||||
|
void add(Collection<StoreFileInfo> newFiles) throws IOException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Add new store files and remove compacted store files after compaction.
|
||||||
|
*/
|
||||||
|
void replace(Collection<StoreFileInfo> compactedFiles, Collection<StoreFileInfo> newFiles)
|
||||||
|
throws IOException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create a writer for writing new store files.
|
||||||
|
* @return Writer for a new StoreFile
|
||||||
|
*/
|
||||||
|
StoreFileWriter createWriter(CreateStoreFileWriterParams params) throws IOException;
|
||||||
|
}
|
|
@ -0,0 +1,178 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.regionserver.storefiletracker;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.Collection;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.hbase.TableName;
|
||||||
|
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
|
||||||
|
import org.apache.hadoop.hbase.io.compress.Compression;
|
||||||
|
import org.apache.hadoop.hbase.io.crypto.Encryption;
|
||||||
|
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
|
||||||
|
import org.apache.hadoop.hbase.io.hfile.HFile;
|
||||||
|
import org.apache.hadoop.hbase.io.hfile.HFileContext;
|
||||||
|
import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.CreateStoreFileWriterParams;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.StoreContext;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.StoreUtils;
|
||||||
|
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||||
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Base class for all store file tracker.
|
||||||
|
* <p/>
|
||||||
|
* Mainly used to place the common logic to skip persistent for secondary replicas.
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
abstract class StoreFileTrackerBase implements StoreFileTracker {
|
||||||
|
|
||||||
|
private static final Logger LOG = LoggerFactory.getLogger(StoreFileTrackerBase.class);
|
||||||
|
|
||||||
|
protected final Configuration conf;
|
||||||
|
|
||||||
|
protected final TableName tableName;
|
||||||
|
|
||||||
|
protected final boolean isPrimaryReplica;
|
||||||
|
|
||||||
|
protected final StoreContext ctx;
|
||||||
|
|
||||||
|
private volatile boolean cacheOnWriteLogged;
|
||||||
|
|
||||||
|
protected StoreFileTrackerBase(Configuration conf, TableName tableName, boolean isPrimaryReplica,
|
||||||
|
StoreContext ctx) {
|
||||||
|
this.conf = conf;
|
||||||
|
this.tableName = tableName;
|
||||||
|
this.isPrimaryReplica = isPrimaryReplica;
|
||||||
|
this.ctx = ctx;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public final void add(Collection<StoreFileInfo> newFiles) throws IOException {
|
||||||
|
if (isPrimaryReplica) {
|
||||||
|
doAddNewStoreFiles(newFiles);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public final void replace(Collection<StoreFileInfo> compactedFiles,
|
||||||
|
Collection<StoreFileInfo> newFiles) throws IOException {
|
||||||
|
if (isPrimaryReplica) {
|
||||||
|
doAddCompactionResults(compactedFiles, newFiles);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private HFileContext createFileContext(Compression.Algorithm compression,
|
||||||
|
boolean includeMVCCReadpoint, boolean includesTag, Encryption.Context encryptionContext) {
|
||||||
|
if (compression == null) {
|
||||||
|
compression = HFile.DEFAULT_COMPRESSION_ALGORITHM;
|
||||||
|
}
|
||||||
|
ColumnFamilyDescriptor family = ctx.getFamily();
|
||||||
|
HFileContext hFileContext = new HFileContextBuilder().withIncludesMvcc(includeMVCCReadpoint)
|
||||||
|
.withIncludesTags(includesTag).withCompression(compression)
|
||||||
|
.withCompressTags(family.isCompressTags()).withChecksumType(StoreUtils.getChecksumType(conf))
|
||||||
|
.withBytesPerCheckSum(StoreUtils.getBytesPerChecksum(conf))
|
||||||
|
.withBlockSize(family.getBlocksize()).withHBaseCheckSum(true)
|
||||||
|
.withDataBlockEncoding(family.getDataBlockEncoding()).withEncryptionContext(encryptionContext)
|
||||||
|
.withCreateTime(EnvironmentEdgeManager.currentTime()).withColumnFamily(family.getName())
|
||||||
|
.withTableName(tableName.getName()).withCellComparator(ctx.getComparator()).build();
|
||||||
|
return hFileContext;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public final StoreFileWriter createWriter(CreateStoreFileWriterParams params)
|
||||||
|
throws IOException {
|
||||||
|
if (!isPrimaryReplica) {
|
||||||
|
throw new IllegalStateException("Should not call create writer on secondary replicas");
|
||||||
|
}
|
||||||
|
// creating new cache config for each new writer
|
||||||
|
final CacheConfig cacheConf = ctx.getCacheConf();
|
||||||
|
final CacheConfig writerCacheConf = new CacheConfig(cacheConf);
|
||||||
|
long totalCompactedFilesSize = params.totalCompactedFilesSize();
|
||||||
|
if (params.isCompaction()) {
|
||||||
|
// Don't cache data on write on compactions, unless specifically configured to do so
|
||||||
|
// Cache only when total file size remains lower than configured threshold
|
||||||
|
final boolean cacheCompactedBlocksOnWrite = cacheConf.shouldCacheCompactedBlocksOnWrite();
|
||||||
|
// if data blocks are to be cached on write
|
||||||
|
// during compaction, we should forcefully
|
||||||
|
// cache index and bloom blocks as well
|
||||||
|
if (cacheCompactedBlocksOnWrite &&
|
||||||
|
totalCompactedFilesSize <= cacheConf.getCacheCompactedBlocksOnWriteThreshold()) {
|
||||||
|
writerCacheConf.enableCacheOnWrite();
|
||||||
|
if (!cacheOnWriteLogged) {
|
||||||
|
LOG.info("For {} , cacheCompactedBlocksOnWrite is true, hence enabled " +
|
||||||
|
"cacheOnWrite for Data blocks, Index blocks and Bloom filter blocks", this);
|
||||||
|
cacheOnWriteLogged = true;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
writerCacheConf.setCacheDataOnWrite(false);
|
||||||
|
if (totalCompactedFilesSize > cacheConf.getCacheCompactedBlocksOnWriteThreshold()) {
|
||||||
|
// checking condition once again for logging
|
||||||
|
LOG.debug(
|
||||||
|
"For {}, setting cacheCompactedBlocksOnWrite as false as total size of compacted " +
|
||||||
|
"files - {}, is greater than cacheCompactedBlocksOnWriteThreshold - {}",
|
||||||
|
this, totalCompactedFilesSize, cacheConf.getCacheCompactedBlocksOnWriteThreshold());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
final boolean shouldCacheDataOnWrite = cacheConf.shouldCacheDataOnWrite();
|
||||||
|
if (shouldCacheDataOnWrite) {
|
||||||
|
writerCacheConf.enableCacheOnWrite();
|
||||||
|
if (!cacheOnWriteLogged) {
|
||||||
|
LOG.info("For {} , cacheDataOnWrite is true, hence enabled cacheOnWrite for " +
|
||||||
|
"Index blocks and Bloom filter blocks", this);
|
||||||
|
cacheOnWriteLogged = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Encryption.Context encryptionContext = ctx.getEncryptionContext();
|
||||||
|
HFileContext hFileContext = createFileContext(params.compression(),
|
||||||
|
params.includeMVCCReadpoint(), params.includesTag(), encryptionContext);
|
||||||
|
Path outputDir;
|
||||||
|
if (requireWritingToTmpDirFirst()) {
|
||||||
|
outputDir =
|
||||||
|
new Path(ctx.getRegionFileSystem().getTempDir(), ctx.getFamily().getNameAsString());
|
||||||
|
} else {
|
||||||
|
throw new UnsupportedOperationException("not supported yet");
|
||||||
|
}
|
||||||
|
StoreFileWriter.Builder builder =
|
||||||
|
new StoreFileWriter.Builder(conf, writerCacheConf, ctx.getRegionFileSystem().getFileSystem())
|
||||||
|
.withOutputDir(outputDir).withBloomType(ctx.getBloomFilterType())
|
||||||
|
.withMaxKeyCount(params.maxKeyCount()).withFavoredNodes(ctx.getFavoredNodes())
|
||||||
|
.withFileContext(hFileContext).withShouldDropCacheBehind(params.shouldDropBehind())
|
||||||
|
.withCompactedFilesSupplier(ctx.getCompactedFilesSupplier())
|
||||||
|
.withFileStoragePolicy(params.fileStoragePolicy());
|
||||||
|
return builder.build();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Whether the implementation of this tracker requires you to write to temp directory first, i.e,
|
||||||
|
* does not allow broken store files under the actual data directory.
|
||||||
|
*/
|
||||||
|
protected abstract boolean requireWritingToTmpDirFirst();
|
||||||
|
|
||||||
|
protected abstract void doAddNewStoreFiles(Collection<StoreFileInfo> newFiles) throws IOException;
|
||||||
|
|
||||||
|
protected abstract void doAddCompactionResults(Collection<StoreFileInfo> compactedFiles,
|
||||||
|
Collection<StoreFileInfo> newFiles) throws IOException;
|
||||||
|
}
|
|
@ -0,0 +1,35 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.regionserver.storefiletracker;
|
||||||
|
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.hbase.TableName;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.StoreContext;
|
||||||
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Factory method for creating store file tracker.
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
public final class StoreFileTrackerFactory {
|
||||||
|
|
||||||
|
public static StoreFileTracker create(Configuration conf, TableName tableName,
|
||||||
|
boolean isPrimaryReplica, StoreContext ctx) {
|
||||||
|
return new DefaultStoreFileTracker(conf, tableName, isPrimaryReplica, ctx);
|
||||||
|
}
|
||||||
|
}
|
|
@ -101,7 +101,6 @@ class MajorCompactionRequest {
|
||||||
|
|
||||||
boolean shouldCFBeCompacted(HRegionFileSystem fileSystem, String family, long ts)
|
boolean shouldCFBeCompacted(HRegionFileSystem fileSystem, String family, long ts)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
|
|
||||||
// do we have any store files?
|
// do we have any store files?
|
||||||
Collection<StoreFileInfo> storeFiles = fileSystem.getStoreFiles(family);
|
Collection<StoreFileInfo> storeFiles = fileSystem.getStoreFiles(family);
|
||||||
if (storeFiles == null) {
|
if (storeFiles == null) {
|
||||||
|
|
|
@ -210,11 +210,13 @@ public class TestIOFencing {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void refreshStoreSizeAndTotalBytes() throws IOException {
|
protected void refreshStoreSizeAndTotalBytes() throws IOException {
|
||||||
try {
|
if (r != null) {
|
||||||
r.compactionsWaiting.countDown();
|
try {
|
||||||
r.compactionsBlocked.await();
|
r.compactionsWaiting.countDown();
|
||||||
} catch (InterruptedException ex) {
|
r.compactionsBlocked.await();
|
||||||
throw new IOException(ex);
|
} catch (InterruptedException ex) {
|
||||||
|
throw new IOException(ex);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
super.refreshStoreSizeAndTotalBytes();
|
super.refreshStoreSizeAndTotalBytes();
|
||||||
}
|
}
|
||||||
|
|
|
@ -215,8 +215,10 @@ public class TestCacheOnWriteInSchema {
|
||||||
@Test
|
@Test
|
||||||
public void testCacheOnWriteInSchema() throws IOException {
|
public void testCacheOnWriteInSchema() throws IOException {
|
||||||
// Write some random data into the store
|
// Write some random data into the store
|
||||||
StoreFileWriter writer = store.createWriterInTmp(Integer.MAX_VALUE,
|
StoreFileWriter writer = store.getStoreEngine()
|
||||||
HFile.DEFAULT_COMPRESSION_ALGORITHM, false, true, false, false);
|
.createWriter(CreateStoreFileWriterParams.create().maxKeyCount(Integer.MAX_VALUE)
|
||||||
|
.compression(HFile.DEFAULT_COMPRESSION_ALGORITHM).isCompaction(false)
|
||||||
|
.includeMVCCReadpoint(true).includesTag(false).shouldDropBehind(false));
|
||||||
writeStoreFile(writer);
|
writeStoreFile(writer);
|
||||||
writer.close();
|
writer.close();
|
||||||
// Verify the block types of interest were cached on write
|
// Verify the block types of interest were cached on write
|
||||||
|
|
|
@ -21,7 +21,7 @@ import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.hbase.CellComparatorImpl;
|
import org.apache.hadoop.hbase.CellComparatorImpl;
|
||||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||||
import org.apache.hadoop.hbase.HRegionInfo;
|
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
|
||||||
import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor;
|
import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor;
|
||||||
import org.apache.hadoop.hbase.regionserver.compactions.RatioBasedCompactionPolicy;
|
import org.apache.hadoop.hbase.regionserver.compactions.RatioBasedCompactionPolicy;
|
||||||
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
|
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
|
||||||
|
@ -65,9 +65,12 @@ public class TestDefaultStoreEngine {
|
||||||
DummyCompactionPolicy.class.getName());
|
DummyCompactionPolicy.class.getName());
|
||||||
conf.set(DefaultStoreEngine.DEFAULT_STORE_FLUSHER_CLASS_KEY,
|
conf.set(DefaultStoreEngine.DEFAULT_STORE_FLUSHER_CLASS_KEY,
|
||||||
DummyStoreFlusher.class.getName());
|
DummyStoreFlusher.class.getName());
|
||||||
|
HRegion mockRegion = Mockito.mock(HRegion.class);
|
||||||
HStore mockStore = Mockito.mock(HStore.class);
|
HStore mockStore = Mockito.mock(HStore.class);
|
||||||
Mockito.when(mockStore.getRegionInfo()).thenReturn(HRegionInfo.FIRST_META_REGIONINFO);
|
Mockito.when(mockStore.getRegionInfo()).thenReturn(RegionInfoBuilder.FIRST_META_REGIONINFO);
|
||||||
StoreEngine<?, ?, ?, ?> se = StoreEngine.create(mockStore, conf, CellComparatorImpl.COMPARATOR);
|
Mockito.when(mockStore.getHRegion()).thenReturn(mockRegion);
|
||||||
|
StoreEngine<?, ?, ?, ?> se =
|
||||||
|
StoreEngine.create(mockStore, conf, CellComparatorImpl.COMPARATOR);
|
||||||
Assert.assertTrue(se instanceof DefaultStoreEngine);
|
Assert.assertTrue(se instanceof DefaultStoreEngine);
|
||||||
Assert.assertTrue(se.getCompactionPolicy() instanceof DummyCompactionPolicy);
|
Assert.assertTrue(se.getCompactionPolicy() instanceof DummyCompactionPolicy);
|
||||||
Assert.assertTrue(se.getStoreFlusher() instanceof DummyStoreFlusher);
|
Assert.assertTrue(se.getStoreFlusher() instanceof DummyStoreFlusher);
|
||||||
|
|
|
@ -5746,7 +5746,7 @@ public class TestHRegion {
|
||||||
Collection<HStoreFile> storeFiles = primaryRegion.getStore(families[0]).getStorefiles();
|
Collection<HStoreFile> storeFiles = primaryRegion.getStore(families[0]).getStorefiles();
|
||||||
primaryRegion.getRegionFileSystem().removeStoreFiles(Bytes.toString(families[0]), storeFiles);
|
primaryRegion.getRegionFileSystem().removeStoreFiles(Bytes.toString(families[0]), storeFiles);
|
||||||
Collection<StoreFileInfo> storeFileInfos = primaryRegion.getRegionFileSystem()
|
Collection<StoreFileInfo> storeFileInfos = primaryRegion.getRegionFileSystem()
|
||||||
.getStoreFiles(families[0]);
|
.getStoreFiles(Bytes.toString(families[0]));
|
||||||
Assert.assertTrue(storeFileInfos == null || storeFileInfos.isEmpty());
|
Assert.assertTrue(storeFileInfos == null || storeFileInfos.isEmpty());
|
||||||
|
|
||||||
verifyData(secondaryRegion, 0, 1000, cq, families);
|
verifyData(secondaryRegion, 0, 1000, cq, families);
|
||||||
|
@ -7647,7 +7647,7 @@ public class TestHRegion {
|
||||||
getCacheConfig() != null? getCacheConfig().shouldEvictOnClose(): true;
|
getCacheConfig() != null? getCacheConfig().shouldEvictOnClose(): true;
|
||||||
for (Path newFile : newFiles) {
|
for (Path newFile : newFiles) {
|
||||||
// Create storefile around what we wrote with a reader on it.
|
// Create storefile around what we wrote with a reader on it.
|
||||||
HStoreFile sf = createStoreFileAndReader(newFile);
|
HStoreFile sf = storeEngine.createStoreFileAndReader(newFile);
|
||||||
sf.closeStoreFile(evictOnClose);
|
sf.closeStoreFile(evictOnClose);
|
||||||
sfs.add(sf);
|
sfs.add(sf);
|
||||||
}
|
}
|
||||||
|
|
|
@ -53,8 +53,8 @@ import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
import java.util.concurrent.atomic.AtomicLong;
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
import java.util.concurrent.atomic.AtomicReference;
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
|
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||||
import java.util.function.IntBinaryOperator;
|
import java.util.function.IntBinaryOperator;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||||
import org.apache.hadoop.fs.FileStatus;
|
import org.apache.hadoop.fs.FileStatus;
|
||||||
|
@ -315,7 +315,7 @@ public class TestHStore {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Verify that compression and data block encoding are respected by the
|
* Verify that compression and data block encoding are respected by the
|
||||||
* Store.createWriterInTmp() method, used on store flush.
|
* createWriter method, used on store flush.
|
||||||
*/
|
*/
|
||||||
@Test
|
@Test
|
||||||
public void testCreateWriter() throws Exception {
|
public void testCreateWriter() throws Exception {
|
||||||
|
@ -327,9 +327,11 @@ public class TestHStore {
|
||||||
.build();
|
.build();
|
||||||
init(name.getMethodName(), conf, hcd);
|
init(name.getMethodName(), conf, hcd);
|
||||||
|
|
||||||
// Test createWriterInTmp()
|
// Test createWriter
|
||||||
StoreFileWriter writer =
|
StoreFileWriter writer = store.getStoreEngine()
|
||||||
store.createWriterInTmp(4, hcd.getCompressionType(), false, true, false, false);
|
.createWriter(CreateStoreFileWriterParams.create().maxKeyCount(4)
|
||||||
|
.compression(hcd.getCompressionType()).isCompaction(false).includeMVCCReadpoint(true)
|
||||||
|
.includesTag(false).shouldDropBehind(false));
|
||||||
Path path = writer.getPath();
|
Path path = writer.getPath();
|
||||||
writer.append(new KeyValue(row, family, qf1, Bytes.toBytes(1)));
|
writer.append(new KeyValue(row, family, qf1, Bytes.toBytes(1)));
|
||||||
writer.append(new KeyValue(row, family, qf2, Bytes.toBytes(2)));
|
writer.append(new KeyValue(row, family, qf2, Bytes.toBytes(2)));
|
||||||
|
@ -1027,19 +1029,19 @@ public class TestHStore {
|
||||||
// add one more file
|
// add one more file
|
||||||
addStoreFile();
|
addStoreFile();
|
||||||
|
|
||||||
HStore spiedStore = spy(store);
|
StoreEngine<?, ?, ?, ?> spiedStoreEngine = spy(store.getStoreEngine());
|
||||||
|
|
||||||
// call first time after files changed
|
// call first time after files changed
|
||||||
spiedStore.refreshStoreFiles();
|
spiedStoreEngine.refreshStoreFiles();
|
||||||
assertEquals(2, this.store.getStorefilesCount());
|
assertEquals(2, this.store.getStorefilesCount());
|
||||||
verify(spiedStore, times(1)).replaceStoreFiles(any(), any());
|
verify(spiedStoreEngine, times(1)).replaceStoreFiles(any(), any());
|
||||||
|
|
||||||
// call second time
|
// call second time
|
||||||
spiedStore.refreshStoreFiles();
|
spiedStoreEngine.refreshStoreFiles();
|
||||||
|
|
||||||
// ensure that replaceStoreFiles is not called, i.e, the times does not change, if files are not
|
// ensure that replaceStoreFiles is not called, i.e, the times does not change, if files are not
|
||||||
// refreshed,
|
// refreshed,
|
||||||
verify(spiedStore, times(1)).replaceStoreFiles(any(), any());
|
verify(spiedStoreEngine, times(1)).replaceStoreFiles(any(), any());
|
||||||
}
|
}
|
||||||
|
|
||||||
private long countMemStoreScanner(StoreScanner scanner) {
|
private long countMemStoreScanner(StoreScanner scanner) {
|
||||||
|
@ -1650,7 +1652,7 @@ public class TestHStore {
|
||||||
// Do compaction
|
// Do compaction
|
||||||
MyThread thread = new MyThread(storeScanner);
|
MyThread thread = new MyThread(storeScanner);
|
||||||
thread.start();
|
thread.start();
|
||||||
store.replaceStoreFiles(actualStorefiles, actualStorefiles1);
|
store.replaceStoreFiles(actualStorefiles, actualStorefiles1, false);
|
||||||
thread.join();
|
thread.join();
|
||||||
KeyValueHeap heap2 = thread.getHeap();
|
KeyValueHeap heap2 = thread.getHeap();
|
||||||
assertFalse(heap.equals(heap2));
|
assertFalse(heap.equals(heap2));
|
||||||
|
@ -1729,8 +1731,10 @@ public class TestHStore {
|
||||||
@Test
|
@Test
|
||||||
public void testHFileContextSetWithCFAndTable() throws Exception {
|
public void testHFileContextSetWithCFAndTable() throws Exception {
|
||||||
init(this.name.getMethodName());
|
init(this.name.getMethodName());
|
||||||
StoreFileWriter writer = store.createWriterInTmp(10000L,
|
StoreFileWriter writer = store.getStoreEngine()
|
||||||
Compression.Algorithm.NONE, false, true, false, true);
|
.createWriter(CreateStoreFileWriterParams.create().maxKeyCount(10000L)
|
||||||
|
.compression(Compression.Algorithm.NONE).isCompaction(true).includeMVCCReadpoint(true)
|
||||||
|
.includesTag(false).shouldDropBehind(true));
|
||||||
HFileContext hFileContext = writer.getHFileWriter().getFileContext();
|
HFileContext hFileContext = writer.getHFileWriter().getFileContext();
|
||||||
assertArrayEquals(family, hFileContext.getColumnFamily());
|
assertArrayEquals(family, hFileContext.getColumnFamily());
|
||||||
assertArrayEquals(table, hFileContext.getTableName());
|
assertArrayEquals(table, hFileContext.getTableName());
|
||||||
|
@ -3277,7 +3281,8 @@ public class TestHStore {
|
||||||
int currentCount = clearSnapshotCounter.incrementAndGet();
|
int currentCount = clearSnapshotCounter.incrementAndGet();
|
||||||
if (currentCount == 1) {
|
if (currentCount == 1) {
|
||||||
try {
|
try {
|
||||||
if (store.lock.isWriteLockedByCurrentThread()) {
|
if (((ReentrantReadWriteLock) store.getStoreEngine().getLock())
|
||||||
|
.isWriteLockedByCurrentThread()) {
|
||||||
shouldWait = false;
|
shouldWait = false;
|
||||||
}
|
}
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -245,7 +245,7 @@ public class TestRegionMergeTransactionOnCluster {
|
||||||
TEST_UTIL.getConfiguration(), fs, tabledir, mergedRegionInfo);
|
TEST_UTIL.getConfiguration(), fs, tabledir, mergedRegionInfo);
|
||||||
int count = 0;
|
int count = 0;
|
||||||
for(ColumnFamilyDescriptor colFamily : columnFamilies) {
|
for(ColumnFamilyDescriptor colFamily : columnFamilies) {
|
||||||
count += hrfs.getStoreFiles(colFamily.getName()).size();
|
count += hrfs.getStoreFiles(colFamily.getNameAsString()).size();
|
||||||
}
|
}
|
||||||
ADMIN.compactRegion(mergedRegionInfo.getRegionName());
|
ADMIN.compactRegion(mergedRegionInfo.getRegionName());
|
||||||
// clean up the merged region store files
|
// clean up the merged region store files
|
||||||
|
@ -254,7 +254,7 @@ public class TestRegionMergeTransactionOnCluster {
|
||||||
int newcount = 0;
|
int newcount = 0;
|
||||||
while (EnvironmentEdgeManager.currentTime() < timeout) {
|
while (EnvironmentEdgeManager.currentTime() < timeout) {
|
||||||
for(ColumnFamilyDescriptor colFamily : columnFamilies) {
|
for(ColumnFamilyDescriptor colFamily : columnFamilies) {
|
||||||
newcount += hrfs.getStoreFiles(colFamily.getName()).size();
|
newcount += hrfs.getStoreFiles(colFamily.getNameAsString()).size();
|
||||||
}
|
}
|
||||||
if(newcount > count) {
|
if(newcount > count) {
|
||||||
break;
|
break;
|
||||||
|
@ -273,7 +273,7 @@ public class TestRegionMergeTransactionOnCluster {
|
||||||
while (EnvironmentEdgeManager.currentTime() < timeout) {
|
while (EnvironmentEdgeManager.currentTime() < timeout) {
|
||||||
int newcount1 = 0;
|
int newcount1 = 0;
|
||||||
for(ColumnFamilyDescriptor colFamily : columnFamilies) {
|
for(ColumnFamilyDescriptor colFamily : columnFamilies) {
|
||||||
newcount1 += hrfs.getStoreFiles(colFamily.getName()).size();
|
newcount1 += hrfs.getStoreFiles(colFamily.getNameAsString()).size();
|
||||||
}
|
}
|
||||||
if(newcount1 <= 1) {
|
if(newcount1 <= 1) {
|
||||||
break;
|
break;
|
||||||
|
|
|
@ -26,7 +26,6 @@ import static org.mockito.Mockito.when;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.Collection;
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
@ -98,7 +97,7 @@ public class TestStoreFileRefresherChore {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Collection<StoreFileInfo> getStoreFiles(String familyName) throws IOException {
|
public List<StoreFileInfo> getStoreFiles(String familyName) throws IOException {
|
||||||
if (fail) {
|
if (fail) {
|
||||||
throw new IOException("simulating FS failure");
|
throw new IOException("simulating FS failure");
|
||||||
}
|
}
|
||||||
|
|
|
@ -29,7 +29,6 @@ import java.util.NavigableSet;
|
||||||
import java.util.Random;
|
import java.util.Random;
|
||||||
import java.util.TreeSet;
|
import java.util.TreeSet;
|
||||||
import java.util.concurrent.CountDownLatch;
|
import java.util.concurrent.CountDownLatch;
|
||||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
|
@ -126,13 +125,12 @@ public class TestStoreScannerClosure {
|
||||||
p.addColumn(fam, Bytes.toBytes("q1"), Bytes.toBytes("val"));
|
p.addColumn(fam, Bytes.toBytes("q1"), Bytes.toBytes("val"));
|
||||||
region.put(p);
|
region.put(p);
|
||||||
HStore store = region.getStore(fam);
|
HStore store = region.getStore(fam);
|
||||||
ReentrantReadWriteLock lock = store.lock;
|
|
||||||
// use the lock to manually get a new memstore scanner. this is what
|
// use the lock to manually get a new memstore scanner. this is what
|
||||||
// HStore#notifyChangedReadersObservers does under the lock.(lock is not needed here
|
// HStore#notifyChangedReadersObservers does under the lock.(lock is not needed here
|
||||||
//since it is just a testcase).
|
//since it is just a testcase).
|
||||||
lock.readLock().lock();
|
store.getStoreEngine().readLock();
|
||||||
final List<KeyValueScanner> memScanners = store.memstore.getScanners(Long.MAX_VALUE);
|
final List<KeyValueScanner> memScanners = store.memstore.getScanners(Long.MAX_VALUE);
|
||||||
lock.readLock().unlock();
|
store.getStoreEngine().readUnlock();
|
||||||
Thread closeThread = new Thread() {
|
Thread closeThread = new Thread() {
|
||||||
public void run() {
|
public void run() {
|
||||||
// close should be completed
|
// close should be completed
|
||||||
|
|
|
@ -20,9 +20,9 @@ package org.apache.hadoop.hbase.regionserver;
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.assertFalse;
|
import static org.junit.Assert.assertFalse;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
import static org.mockito.Matchers.any;
|
import static org.mockito.ArgumentMatchers.any;
|
||||||
import static org.mockito.Matchers.anyInt;
|
import static org.mockito.ArgumentMatchers.anyInt;
|
||||||
import static org.mockito.Matchers.anyLong;
|
import static org.mockito.ArgumentMatchers.anyLong;
|
||||||
import static org.mockito.Mockito.mock;
|
import static org.mockito.Mockito.mock;
|
||||||
import static org.mockito.Mockito.times;
|
import static org.mockito.Mockito.times;
|
||||||
import static org.mockito.Mockito.verify;
|
import static org.mockito.Mockito.verify;
|
||||||
|
@ -36,7 +36,7 @@ import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.hbase.CellComparatorImpl;
|
import org.apache.hadoop.hbase.CellComparatorImpl;
|
||||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||||
import org.apache.hadoop.hbase.HRegionInfo;
|
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
|
||||||
import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
|
import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
|
||||||
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequestImpl;
|
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequestImpl;
|
||||||
import org.apache.hadoop.hbase.regionserver.compactions.StripeCompactionPolicy;
|
import org.apache.hadoop.hbase.regionserver.compactions.StripeCompactionPolicy;
|
||||||
|
@ -118,8 +118,10 @@ public class TestStripeStoreEngine {
|
||||||
}
|
}
|
||||||
|
|
||||||
private static TestStoreEngine createEngine(Configuration conf) throws Exception {
|
private static TestStoreEngine createEngine(Configuration conf) throws Exception {
|
||||||
|
HRegion region = mock(HRegion.class);
|
||||||
HStore store = mock(HStore.class);
|
HStore store = mock(HStore.class);
|
||||||
when(store.getRegionInfo()).thenReturn(HRegionInfo.FIRST_META_REGIONINFO);
|
when(store.getRegionInfo()).thenReturn(RegionInfoBuilder.FIRST_META_REGIONINFO);
|
||||||
|
when(store.getHRegion()).thenReturn(region);
|
||||||
CellComparatorImpl kvComparator = mock(CellComparatorImpl.class);
|
CellComparatorImpl kvComparator = mock(CellComparatorImpl.class);
|
||||||
return (TestStoreEngine)StoreEngine.create(store, conf, kvComparator);
|
return (TestStoreEngine)StoreEngine.create(store, conf, kvComparator);
|
||||||
}
|
}
|
||||||
|
|
|
@ -22,9 +22,6 @@ import static org.apache.hadoop.hbase.regionserver.compactions.TestCompactor.cre
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
import static org.mockito.ArgumentMatchers.any;
|
import static org.mockito.ArgumentMatchers.any;
|
||||||
import static org.mockito.ArgumentMatchers.anyBoolean;
|
|
||||||
import static org.mockito.ArgumentMatchers.anyLong;
|
|
||||||
import static org.mockito.ArgumentMatchers.anyString;
|
|
||||||
import static org.mockito.Mockito.mock;
|
import static org.mockito.Mockito.mock;
|
||||||
import static org.mockito.Mockito.when;
|
import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
|
@ -41,14 +38,16 @@ import org.apache.hadoop.hbase.CellComparatorImpl;
|
||||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||||
import org.apache.hadoop.hbase.HColumnDescriptor;
|
import org.apache.hadoop.hbase.HColumnDescriptor;
|
||||||
import org.apache.hadoop.hbase.HRegionInfo;
|
|
||||||
import org.apache.hadoop.hbase.KeyValue;
|
import org.apache.hadoop.hbase.KeyValue;
|
||||||
import org.apache.hadoop.hbase.TableName;
|
import org.apache.hadoop.hbase.TableName;
|
||||||
|
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.CreateStoreFileWriterParams;
|
||||||
import org.apache.hadoop.hbase.regionserver.HStore;
|
import org.apache.hadoop.hbase.regionserver.HStore;
|
||||||
import org.apache.hadoop.hbase.regionserver.HStoreFile;
|
import org.apache.hadoop.hbase.regionserver.HStoreFile;
|
||||||
import org.apache.hadoop.hbase.regionserver.InternalScanner;
|
import org.apache.hadoop.hbase.regionserver.InternalScanner;
|
||||||
import org.apache.hadoop.hbase.regionserver.ScanInfo;
|
import org.apache.hadoop.hbase.regionserver.ScanInfo;
|
||||||
import org.apache.hadoop.hbase.regionserver.ScanType;
|
import org.apache.hadoop.hbase.regionserver.ScanType;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.StoreEngine;
|
||||||
import org.apache.hadoop.hbase.regionserver.StoreFileScanner;
|
import org.apache.hadoop.hbase.regionserver.StoreFileScanner;
|
||||||
import org.apache.hadoop.hbase.regionserver.StoreUtils;
|
import org.apache.hadoop.hbase.regionserver.StoreUtils;
|
||||||
import org.apache.hadoop.hbase.regionserver.compactions.TestCompactor.Scanner;
|
import org.apache.hadoop.hbase.regionserver.compactions.TestCompactor.Scanner;
|
||||||
|
@ -107,11 +106,10 @@ public class TestDateTieredCompactor {
|
||||||
when(store.getScanInfo()).thenReturn(si);
|
when(store.getScanInfo()).thenReturn(si);
|
||||||
when(store.areWritesEnabled()).thenReturn(true);
|
when(store.areWritesEnabled()).thenReturn(true);
|
||||||
when(store.getFileSystem()).thenReturn(mock(FileSystem.class));
|
when(store.getFileSystem()).thenReturn(mock(FileSystem.class));
|
||||||
when(store.getRegionInfo()).thenReturn(new HRegionInfo(TABLE_NAME));
|
when(store.getRegionInfo()).thenReturn(RegionInfoBuilder.newBuilder(TABLE_NAME).build());
|
||||||
when(store.createWriterInTmp(anyLong(), any(), anyBoolean(),
|
StoreEngine storeEngine = mock(StoreEngine.class);
|
||||||
anyBoolean(), anyBoolean(), anyBoolean())).thenAnswer(writers);
|
when(storeEngine.createWriter(any(CreateStoreFileWriterParams.class))).thenAnswer(writers);
|
||||||
when(store.createWriterInTmp(anyLong(), any(), anyBoolean(),
|
when(store.getStoreEngine()).thenReturn(storeEngine);
|
||||||
anyBoolean(), anyBoolean(), anyBoolean(), anyLong(), anyString())).thenAnswer(writers);
|
|
||||||
when(store.getComparator()).thenReturn(CellComparatorImpl.COMPARATOR);
|
when(store.getComparator()).thenReturn(CellComparatorImpl.COMPARATOR);
|
||||||
OptionalLong maxSequenceId = StoreUtils.getMaxSequenceIdInList(storefiles);
|
OptionalLong maxSequenceId = StoreUtils.getMaxSequenceIdInList(storefiles);
|
||||||
when(store.getMaxSequenceId()).thenReturn(maxSequenceId);
|
when(store.getMaxSequenceId()).thenReturn(maxSequenceId);
|
||||||
|
|
|
@ -30,7 +30,6 @@ import static org.mockito.ArgumentMatchers.any;
|
||||||
import static org.mockito.ArgumentMatchers.anyBoolean;
|
import static org.mockito.ArgumentMatchers.anyBoolean;
|
||||||
import static org.mockito.ArgumentMatchers.anyInt;
|
import static org.mockito.ArgumentMatchers.anyInt;
|
||||||
import static org.mockito.ArgumentMatchers.anyLong;
|
import static org.mockito.ArgumentMatchers.anyLong;
|
||||||
import static org.mockito.ArgumentMatchers.anyString;
|
|
||||||
import static org.mockito.ArgumentMatchers.argThat;
|
import static org.mockito.ArgumentMatchers.argThat;
|
||||||
import static org.mockito.ArgumentMatchers.eq;
|
import static org.mockito.ArgumentMatchers.eq;
|
||||||
import static org.mockito.ArgumentMatchers.isNull;
|
import static org.mockito.ArgumentMatchers.isNull;
|
||||||
|
@ -58,6 +57,7 @@ import org.apache.hadoop.hbase.KeyValue;
|
||||||
import org.apache.hadoop.hbase.io.TimeRange;
|
import org.apache.hadoop.hbase.io.TimeRange;
|
||||||
import org.apache.hadoop.hbase.io.hfile.HFile;
|
import org.apache.hadoop.hbase.io.hfile.HFile;
|
||||||
import org.apache.hadoop.hbase.regionserver.BloomType;
|
import org.apache.hadoop.hbase.regionserver.BloomType;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.CreateStoreFileWriterParams;
|
||||||
import org.apache.hadoop.hbase.regionserver.HStore;
|
import org.apache.hadoop.hbase.regionserver.HStore;
|
||||||
import org.apache.hadoop.hbase.regionserver.HStoreFile;
|
import org.apache.hadoop.hbase.regionserver.HStoreFile;
|
||||||
import org.apache.hadoop.hbase.regionserver.InternalScanner;
|
import org.apache.hadoop.hbase.regionserver.InternalScanner;
|
||||||
|
@ -65,6 +65,7 @@ import org.apache.hadoop.hbase.regionserver.ScanInfo;
|
||||||
import org.apache.hadoop.hbase.regionserver.ScanType;
|
import org.apache.hadoop.hbase.regionserver.ScanType;
|
||||||
import org.apache.hadoop.hbase.regionserver.ScannerContext;
|
import org.apache.hadoop.hbase.regionserver.ScannerContext;
|
||||||
import org.apache.hadoop.hbase.regionserver.StoreConfigInformation;
|
import org.apache.hadoop.hbase.regionserver.StoreConfigInformation;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.StoreEngine;
|
||||||
import org.apache.hadoop.hbase.regionserver.StoreFileReader;
|
import org.apache.hadoop.hbase.regionserver.StoreFileReader;
|
||||||
import org.apache.hadoop.hbase.regionserver.StoreFileScanner;
|
import org.apache.hadoop.hbase.regionserver.StoreFileScanner;
|
||||||
import org.apache.hadoop.hbase.regionserver.StripeMultiFileWriter;
|
import org.apache.hadoop.hbase.regionserver.StripeMultiFileWriter;
|
||||||
|
@ -861,12 +862,9 @@ public class TestStripeCompactionPolicy {
|
||||||
when(info.getRegionNameAsString()).thenReturn("testRegion");
|
when(info.getRegionNameAsString()).thenReturn("testRegion");
|
||||||
when(store.getColumnFamilyDescriptor()).thenReturn(col);
|
when(store.getColumnFamilyDescriptor()).thenReturn(col);
|
||||||
when(store.getRegionInfo()).thenReturn(info);
|
when(store.getRegionInfo()).thenReturn(info);
|
||||||
when(
|
StoreEngine storeEngine = mock(StoreEngine.class);
|
||||||
store.createWriterInTmp(anyLong(), any(), anyBoolean(),
|
when(storeEngine.createWriter(any(CreateStoreFileWriterParams.class))).thenAnswer(writers);
|
||||||
anyBoolean(), anyBoolean(), anyBoolean())).thenAnswer(writers);
|
when(store.getStoreEngine()).thenReturn(storeEngine);
|
||||||
when(
|
|
||||||
store.createWriterInTmp(anyLong(), any(), anyBoolean(),
|
|
||||||
anyBoolean(), anyBoolean(), anyBoolean(), anyLong(), anyString())).thenAnswer(writers);
|
|
||||||
|
|
||||||
Configuration conf = HBaseConfiguration.create();
|
Configuration conf = HBaseConfiguration.create();
|
||||||
conf.setBoolean("hbase.regionserver.compaction.private.readers", usePrivateReaders);
|
conf.setBoolean("hbase.regionserver.compaction.private.readers", usePrivateReaders);
|
||||||
|
|
|
@ -21,9 +21,6 @@ import static org.apache.hadoop.hbase.regionserver.StripeStoreFileManager.OPEN_K
|
||||||
import static org.apache.hadoop.hbase.regionserver.compactions.TestCompactor.createDummyRequest;
|
import static org.apache.hadoop.hbase.regionserver.compactions.TestCompactor.createDummyRequest;
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
import static org.mockito.ArgumentMatchers.any;
|
import static org.mockito.ArgumentMatchers.any;
|
||||||
import static org.mockito.ArgumentMatchers.anyBoolean;
|
|
||||||
import static org.mockito.ArgumentMatchers.anyLong;
|
|
||||||
import static org.mockito.ArgumentMatchers.anyString;
|
|
||||||
import static org.mockito.Mockito.mock;
|
import static org.mockito.Mockito.mock;
|
||||||
import static org.mockito.Mockito.when;
|
import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
|
@ -39,13 +36,15 @@ import org.apache.hadoop.hbase.CellUtil;
|
||||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||||
import org.apache.hadoop.hbase.HColumnDescriptor;
|
import org.apache.hadoop.hbase.HColumnDescriptor;
|
||||||
import org.apache.hadoop.hbase.HRegionInfo;
|
|
||||||
import org.apache.hadoop.hbase.KeyValue;
|
import org.apache.hadoop.hbase.KeyValue;
|
||||||
import org.apache.hadoop.hbase.TableName;
|
import org.apache.hadoop.hbase.TableName;
|
||||||
|
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.CreateStoreFileWriterParams;
|
||||||
import org.apache.hadoop.hbase.regionserver.HStore;
|
import org.apache.hadoop.hbase.regionserver.HStore;
|
||||||
import org.apache.hadoop.hbase.regionserver.InternalScanner;
|
import org.apache.hadoop.hbase.regionserver.InternalScanner;
|
||||||
import org.apache.hadoop.hbase.regionserver.ScanInfo;
|
import org.apache.hadoop.hbase.regionserver.ScanInfo;
|
||||||
import org.apache.hadoop.hbase.regionserver.ScanType;
|
import org.apache.hadoop.hbase.regionserver.ScanType;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.StoreEngine;
|
||||||
import org.apache.hadoop.hbase.regionserver.StoreFileScanner;
|
import org.apache.hadoop.hbase.regionserver.StoreFileScanner;
|
||||||
import org.apache.hadoop.hbase.regionserver.compactions.TestCompactor.Scanner;
|
import org.apache.hadoop.hbase.regionserver.compactions.TestCompactor.Scanner;
|
||||||
import org.apache.hadoop.hbase.regionserver.compactions.TestCompactor.StoreFileWritersCapture;
|
import org.apache.hadoop.hbase.regionserver.compactions.TestCompactor.StoreFileWritersCapture;
|
||||||
|
@ -206,11 +205,10 @@ public class TestStripeCompactor {
|
||||||
when(store.getScanInfo()).thenReturn(si);
|
when(store.getScanInfo()).thenReturn(si);
|
||||||
when(store.areWritesEnabled()).thenReturn(true);
|
when(store.areWritesEnabled()).thenReturn(true);
|
||||||
when(store.getFileSystem()).thenReturn(mock(FileSystem.class));
|
when(store.getFileSystem()).thenReturn(mock(FileSystem.class));
|
||||||
when(store.getRegionInfo()).thenReturn(new HRegionInfo(TABLE_NAME));
|
when(store.getRegionInfo()).thenReturn(RegionInfoBuilder.newBuilder(TABLE_NAME).build());
|
||||||
when(store.createWriterInTmp(anyLong(), any(), anyBoolean(),
|
StoreEngine storeEngine = mock(StoreEngine.class);
|
||||||
anyBoolean(), anyBoolean(), anyBoolean())).thenAnswer(writers);
|
when(storeEngine.createWriter(any(CreateStoreFileWriterParams.class))).thenAnswer(writers);
|
||||||
when(store.createWriterInTmp(anyLong(), any(), anyBoolean(),
|
when(store.getStoreEngine()).thenReturn(storeEngine);
|
||||||
anyBoolean(), anyBoolean(), anyBoolean(), anyLong(), anyString())).thenAnswer(writers);
|
|
||||||
when(store.getComparator()).thenReturn(CellComparatorImpl.COMPARATOR);
|
when(store.getComparator()).thenReturn(CellComparatorImpl.COMPARATOR);
|
||||||
|
|
||||||
return new StripeCompactor(conf, store) {
|
return new StripeCompactor(conf, store) {
|
||||||
|
|
Loading…
Reference in New Issue