HBASE-24289 Heterogeneous Storage for Date Tiered Compaction (#1730)

Signed-off-by: Guanghao Zhang <zghao@apache.org>
Signed-off-by: Duo Zhang <zhangduo@apache.org>
This commit is contained in:
pengmq1 2020-06-30 15:10:04 +08:00 committed by GitHub
parent 982bd5fadd
commit be57e40f36
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 539 additions and 25 deletions

View File

@ -0,0 +1,125 @@
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
# Heterogeneous Storage for Date Tiered Compaction
## Objective
Support DateTiredCompaction([HBASE-15181](https://issues.apache.org/jira/browse/HBASE-15181))
for cold and hot data separation, support different storage policies for different time periods
of data to get better performance, for example, we can configure the data of last 1 month in SSD,
and 1 month ago data was in HDD.
+ Date Tiered Compaction (DTCP) is based on date tiering (date-aware), we hope to support
the separation of cold and hot data, heterogeneous storage. Set different storage
policies (in HDFS) for data in different time windows.
+ DTCP designs different windows, and we can classify the windows according to
the timestamps of the windows. For example: HOT window, WARM window, COLD window.
+ DTCP divides storefiles into different windows, and performs minor Compaction within
a time window. The storefile generated by Compaction will use the storage strategy of
this window. For example, if a window is a HOT window, the storefile generated by compaction
can be stored on the SSD. There are already WAL and the entire CF support storage policy
(HBASE-12848, HBASE-14061), our goal is to achieve cold and hot separation in one CF or
a region, using different storage policies.
## Definition of hot and cold data
Usually the data of the last 3 days can be defined as `HOT data`, hot age = 3 days.
If the written timestamp of the data(Cell) is > (timestamp now - hot age), we think the data is hot data.
Warm age can be defined in the same way. Only one type of data is allowed.
If data timestamp < (now - warm age), we consider it is COLD.
```
if timestamp >= (now - hot age) , HOT data
else if timestamp >= (now - warm age), WARM data
else COLD data
```
## Time window
When given a time now, it is the time when the compaction occurs. Each window and the size of
the window are automatically calculated by DTCP, and the window boundary is rounded according
to the base size.
Assuming that the base window size is 1 hour, and each tier has 3 windows, the current time is
between 12:00 and 13:00. We have defined three types of winow (`HOT, WARM, COLD`). The type of
winodw is determined by the timestamp at the beginning of the window and the timestamp now.
As shown in the figure 1 below, the type of each window can be determined by the age range
(hot / warm / cold) where (now - window.startTimestamp) falls. Cold age can not need to be set,
the default Long.MAX, meaning that the window with a very early time stamp belongs to the
cold window.
![figure 1](https://raw.githubusercontent.com/pengmq1/images/master/F1-HDTCP.png "figure 1")
## Example configuration
| Configuration Key | value | Note |
|:---|:---:|:---|
|hbase.hstore.compaction.date.tiered.storage.policy.enable|true|if or not use storage policy for window. Default is false|
|hbase.hstore.compaction.date.tiered.hot.window.age.millis|3600000|hot data age
|hbase.hstore.compaction.date.tiered.hot.window.storage.policy|ALL_SSD|hot data storage policy, Corresponding HDFS storage policy
|hbase.hstore.compaction.date.tiered.warm.window.age.millis|20600000||
|hbase.hstore.compaction.date.tiered.warm.window.storage.policy|ONE_SSD||
|hbase.hstore.compaction.date.tiered.cold.window.storage.policy|HOT||
The original date tiered compaction related configuration has the same meaning and maintains
compatibility.
If `hbase.hstore.compaction.date.tiered.storage.policy.enable = false`. DTCP still follows the
original logic and has not changed.
## Storage strategy
HDFS provides the following storage policies, you can refer to
https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-hdfs/ArchivalStorage.html
|Policy ID | Policy Name | Block Placement (3 replicas)|
|:---|:---|:---|
|15|Lasy_Persist|RAM_DISK: 1, DISK: 2|
|12|All_SSD|SSD: 3|
|10|One_SSD|SSD: 1, DISK: 2|
|7|Hot (default)|DISK: 3|
|5|Warm|DISK: 1, ARCHIVE: 2|
|2|Cold|ARCHIVE: 3|
Date Tiered Compaction (DTCP) supports the output of multiple storefiles. We hope that these
storefiles can be set with different storage policies (in HDFS).
Therefore, through DateTieredMultiFileWriter to generate different StoreFileWriters with
storage policy to achieve the purpose.
## Why use different child tmp dir
Before StoreFileWriter writes a storefile, we can create different dirs in the tmp directory
of the region and set the corresponding storage policy for these dirs. This way
StoreFileWriter can write files to different dirs.
Since **HDFS** does not support the create file with the storage policy parameter
(See https://issues.apache.org/jira/browse/HDFS-13209 and now not support on hadoop 2.x),
and HDFS cannot set a storage policy for a file / dir path that does not yet exist.
When the compaction ends, the storefile path must exist at this time, and I set the
storage policy to Storefile.
But, in HDFS, when the file is written first, and then the storage policy is set.
The actual storage location of the data does not match the storage policy. For example,
write three copies of a file (1 block) in the HDD, then set storage policy is ALL_SSD,
but the data block will not be moved to the SSD immediately.
“HDFS wont move the file content across different block volumes on rename”. Data movement
requires the HDFS mover tool, or use HDFS SPS
(for details, see https://issues.apache.org/jira/browse/HDFS-10285), so in order to
avoid moving data blocks at the HDFS level, we can set the file parent directory to
the storage policy we need before writing data. The new file automatically inherits the
storage policy of the parent directory, and is written according to the correct disk
type when writing. So as to avoid later data movement.
Over time, the original HOT data will become WARM / COLD and no longer belong to the
HOT window. When the compaction occurs again, the data will be automatically downgraded,
such as from SSD to HDD. The compaction mechanism will generate a new file (write into HDD)
and delete it Old file (SSD).

View File

@ -64,6 +64,11 @@ public final class HConstants {
*/
public static final String RECOVERED_HFILES_DIR = "recovered.hfiles";
/**
* Date Tiered Compaction tmp dir prefix name if use storage policy
*/
public static final String STORAGE_POLICY_PREFIX = "storage_policy_";
/**
* The first four bytes of Hadoop RPC connections
*/

View File

@ -43,6 +43,10 @@ public abstract class AbstractMultiFileWriter implements CellSink, ShipperListen
public interface WriterFactory {
public StoreFileWriter createWriter() throws IOException;
default StoreFileWriter createWriterWithStoragePolicy(String fileStoragePolicy)
throws IOException {
return createWriter();
};
}
/**

View File

@ -38,15 +38,20 @@ public class DateTieredMultiFileWriter extends AbstractMultiFileWriter {
private final boolean needEmptyFile;
private final Map<Long, String> lowerBoundariesPolicies;
/**
* @param lowerBoundariesPolicies each window to storage policy map.
* @param needEmptyFile whether need to create an empty store file if we haven't written out
* anything.
*/
public DateTieredMultiFileWriter(List<Long> lowerBoundaries, boolean needEmptyFile) {
public DateTieredMultiFileWriter(List<Long> lowerBoundaries,
Map<Long, String> lowerBoundariesPolicies, boolean needEmptyFile) {
for (Long lowerBoundary : lowerBoundaries) {
lowerBoundary2Writer.put(lowerBoundary, null);
}
this.needEmptyFile = needEmptyFile;
this.lowerBoundariesPolicies = lowerBoundariesPolicies;
}
@Override
@ -54,7 +59,12 @@ public class DateTieredMultiFileWriter extends AbstractMultiFileWriter {
Map.Entry<Long, StoreFileWriter> entry = lowerBoundary2Writer.floorEntry(cell.getTimestamp());
StoreFileWriter writer = entry.getValue();
if (writer == null) {
writer = writerFactory.createWriter();
String lowerBoundaryStoragePolicy = lowerBoundariesPolicies.get(entry.getKey());
if (lowerBoundaryStoragePolicy != null) {
writer = writerFactory.createWriterWithStoragePolicy(lowerBoundaryStoragePolicy);
} else {
writer = writerFactory.createWriter();
}
lowerBoundary2Writer.put(entry.getKey(), writer);
}
writer.append(cell);

View File

@ -93,7 +93,9 @@ public class DateTieredStoreEngine extends StoreEngine<DefaultStoreFlusher,
public List<Path> compact(ThroughputController throughputController, User user)
throws IOException {
if (request instanceof DateTieredCompactionRequest) {
return compactor.compact(request, ((DateTieredCompactionRequest) request).getBoundaries(),
DateTieredCompactionRequest compactionRequest = (DateTieredCompactionRequest) request;
return compactor.compact(request, compactionRequest.getBoundaries(),
compactionRequest.getBoundariesPolicies(),
throughputController, user);
} else {
throw new IllegalArgumentException("DateTieredCompactionRequest is expected. Actual: "

View File

@ -101,6 +101,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.CompactionDes
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ChecksumType;
import org.apache.hadoop.hbase.util.ClassSize;
import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.ReflectionUtils;
@ -1148,7 +1149,7 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
boolean isCompaction, boolean includeMVCCReadpoint, boolean includesTag,
boolean shouldDropBehind) throws IOException {
return createWriterInTmp(maxKeyCount, compression, isCompaction, includeMVCCReadpoint,
includesTag, shouldDropBehind, -1);
includesTag, shouldDropBehind, -1, HConstants.EMPTY_STRING);
}
/**
@ -1162,7 +1163,8 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
// compaction
public StoreFileWriter createWriterInTmp(long maxKeyCount, Compression.Algorithm compression,
boolean isCompaction, boolean includeMVCCReadpoint, boolean includesTag,
boolean shouldDropBehind, long totalCompactedFilesSize) throws IOException {
boolean shouldDropBehind, long totalCompactedFilesSize, String fileStoragePolicy)
throws IOException {
// creating new cache config for each new writer
final CacheConfig writerCacheConf = new CacheConfig(cacheConf);
if (isCompaction) {
@ -1219,7 +1221,8 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
.withFavoredNodes(favoredNodes)
.withFileContext(hFileContext)
.withShouldDropCacheBehind(shouldDropBehind)
.withCompactedFilesSupplier(this::getCompactedFiles);
.withCompactedFilesSupplier(this::getCompactedFiles)
.withFileStoragePolicy(fileStoragePolicy);
return builder.build();
}
@ -1540,6 +1543,7 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
Collection<HStoreFile> filesToCompact, User user, long compactionStartTime,
List<Path> newFiles) throws IOException {
// Do the steps necessary to complete the compaction.
setStoragePolicyFromFileName(newFiles);
List<HStoreFile> sfs = moveCompactedFilesIntoPlace(cr, newFiles, user);
writeCompactionWalRecord(filesToCompact, sfs);
replaceStoreFiles(filesToCompact, sfs);
@ -1569,6 +1573,18 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
return sfs;
}
// Set correct storage policy from the file name of DTCP.
// Rename file will not change the storage policy.
private void setStoragePolicyFromFileName(List<Path> newFiles) throws IOException {
String prefix = HConstants.STORAGE_POLICY_PREFIX;
for (Path newFile : newFiles) {
if (newFile.getParent().getName().startsWith(prefix)) {
CommonFSUtils.setStoragePolicy(fs.getFileSystem(), newFile,
newFile.getParent().getName().substring(prefix.length()));
}
}
}
private List<HStoreFile> moveCompactedFilesIntoPlace(CompactionRequestImpl cr,
List<Path> newFiles, User user) throws IOException {
List<HStoreFile> sfs = new ArrayList<>(newFiles.size());

View File

@ -66,6 +66,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
import org.apache.hbase.thirdparty.com.google.common.base.Strings;
import org.apache.hbase.thirdparty.com.google.common.collect.SetMultimap;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
@ -437,6 +438,7 @@ public class StoreFileWriter implements CellSink, ShipperListener {
private HFileContext fileContext;
private boolean shouldDropCacheBehind;
private Supplier<Collection<HStoreFile>> compactedFilesSupplier = () -> Collections.emptySet();
private String fileStoragePolicy;
public Builder(Configuration conf, CacheConfig cacheConf,
FileSystem fs) {
@ -518,6 +520,11 @@ public class StoreFileWriter implements CellSink, ShipperListener {
return this;
}
public Builder withFileStoragePolicy(String fileStoragePolicy) {
this.fileStoragePolicy = fileStoragePolicy;
return this;
}
/**
* Create a store file writer. Client is responsible for closing file when
* done. If metadata, add BEFORE closing using
@ -547,6 +554,20 @@ public class StoreFileWriter implements CellSink, ShipperListener {
CommonFSUtils.setStoragePolicy(this.fs, dir, policyName);
if (filePath == null) {
// The stored file and related blocks will used the directory based StoragePolicy.
// Because HDFS DistributedFileSystem does not support create files with storage policy
// before version 3.3.0 (See HDFS-13209). Use child dir here is to make stored files
// satisfy the specific storage policy when writing. So as to avoid later data movement.
// We don't want to change whole temp dir to 'fileStoragePolicy'.
if (!Strings.isNullOrEmpty(fileStoragePolicy)) {
dir = new Path(dir, HConstants.STORAGE_POLICY_PREFIX + fileStoragePolicy);
if (!fs.exists(dir)) {
HRegionFileSystem.mkdirs(fs, conf, dir);
LOG.info(
"Create tmp dir " + dir.toString() + " with storage policy: " + fileStoragePolicy);
}
CommonFSUtils.setStoragePolicy(this.fs, dir, fileStoragePolicy);
}
filePath = getUniqueFile(fs, dir);
if (!BloomFilterFactory.isGeneralBloomEnabled(conf)) {
bloomType = BloomType.NONE;

View File

@ -53,6 +53,12 @@ public abstract class AbstractMultiOutputCompactor<T extends AbstractMultiFileWr
public StoreFileWriter createWriter() throws IOException {
return createTmpWriter(fd, shouldDropBehind);
}
@Override
public StoreFileWriter createWriterWithStoragePolicy(String fileStoragePolicy)
throws IOException {
return createTmpWriter(fd, shouldDropBehind, fileStoragePolicy);
}
};
// Prepare multi-writer, and perform the compaction using scanner and writer.
// It is ok here if storeScanner is null.

View File

@ -92,6 +92,20 @@ public class CompactionConfiguration {
private static final Class<? extends CompactionWindowFactory>
DEFAULT_DATE_TIERED_COMPACTION_WINDOW_FACTORY_CLASS = ExponentialCompactionWindowFactory.class;
public static final String DATE_TIERED_STORAGE_POLICY_ENABLE_KEY =
"hbase.hstore.compaction.date.tiered.storage.policy.enable";
public static final String DATE_TIERED_HOT_WINDOW_AGE_MILLIS_KEY =
"hbase.hstore.compaction.date.tiered.hot.window.age.millis";
public static final String DATE_TIERED_HOT_WINDOW_STORAGE_POLICY_KEY =
"hbase.hstore.compaction.date.tiered.hot.window.storage.policy";
public static final String DATE_TIERED_WARM_WINDOW_AGE_MILLIS_KEY =
"hbase.hstore.compaction.date.tiered.warm.window.age.millis";
public static final String DATE_TIERED_WARM_WINDOW_STORAGE_POLICY_KEY =
"hbase.hstore.compaction.date.tiered.warm.window.storage.policy";
/** Windows older than warm age belong to COLD_WINDOW **/
public static final String DATE_TIERED_COLD_WINDOW_STORAGE_POLICY_KEY =
"hbase.hstore.compaction.date.tiered.cold.window.storage.policy";
Configuration conf;
StoreConfigInformation storeConfigInfo;
@ -113,6 +127,12 @@ public class CompactionConfiguration {
private final String compactionPolicyForDateTieredWindow;
private final boolean dateTieredSingleOutputForMinorCompaction;
private final String dateTieredCompactionWindowFactory;
private final boolean dateTieredStoragePolicyEnable;
private long hotWindowAgeMillis;
private long warmWindowAgeMillis;
private String hotWindowStoragePolicy;
private String warmWindowStoragePolicy;
private String coldWindowStoragePolicy;
CompactionConfiguration(Configuration conf, StoreConfigInformation storeConfigInfo) {
this.conf = conf;
@ -147,6 +167,13 @@ public class CompactionConfiguration {
this.dateTieredCompactionWindowFactory = conf.get(
DATE_TIERED_COMPACTION_WINDOW_FACTORY_CLASS_KEY,
DEFAULT_DATE_TIERED_COMPACTION_WINDOW_FACTORY_CLASS.getName());
// for Heterogeneous Storage
dateTieredStoragePolicyEnable = conf.getBoolean(DATE_TIERED_STORAGE_POLICY_ENABLE_KEY, false);
hotWindowAgeMillis = conf.getLong(DATE_TIERED_HOT_WINDOW_AGE_MILLIS_KEY, 86400000L);
hotWindowStoragePolicy = conf.get(DATE_TIERED_HOT_WINDOW_STORAGE_POLICY_KEY, "ALL_SSD");
warmWindowAgeMillis = conf.getLong(DATE_TIERED_WARM_WINDOW_AGE_MILLIS_KEY, 604800000L);
warmWindowStoragePolicy = conf.get(DATE_TIERED_WARM_WINDOW_STORAGE_POLICY_KEY, "ONE_SSD");
coldWindowStoragePolicy = conf.get(DATE_TIERED_COLD_WINDOW_STORAGE_POLICY_KEY, "HOT");
LOG.info(toString());
}
@ -293,4 +320,28 @@ public class CompactionConfiguration {
public String getDateTieredCompactionWindowFactory() {
return dateTieredCompactionWindowFactory;
}
public boolean isDateTieredStoragePolicyEnable() {
return dateTieredStoragePolicyEnable;
}
public long getHotWindowAgeMillis() {
return hotWindowAgeMillis;
}
public long getWarmWindowAgeMillis() {
return warmWindowAgeMillis;
}
public String getHotWindowStoragePolicy() {
return hotWindowStoragePolicy.trim().toUpperCase();
}
public String getWarmWindowStoragePolicy() {
return warmWindowStoragePolicy.trim().toUpperCase();
}
public String getColdWindowStoragePolicy() {
return coldWindowStoragePolicy.trim().toUpperCase();
}
}

View File

@ -268,7 +268,15 @@ public abstract class Compactor<T extends CellSink> {
// See HBASE-8166, HBASE-12600, and HBASE-13389.
return store
.createWriterInTmp(fd.maxKeyCount, this.compactionCompression, true, fd.maxMVCCReadpoint > 0,
fd.maxTagsLength > 0, shouldDropBehind, fd.totalCompactedFilesSize);
fd.maxTagsLength > 0, shouldDropBehind, fd.totalCompactedFilesSize,
HConstants.EMPTY_STRING);
}
protected final StoreFileWriter createTmpWriter(FileDetails fd, boolean shouldDropBehind,
String fileStoragePolicy) throws IOException {
return store
.createWriterInTmp(fd.maxKeyCount, this.compactionCompression, true, fd.maxMVCCReadpoint > 0,
fd.maxTagsLength > 0, shouldDropBehind, fd.totalCompactedFilesSize, fileStoragePolicy);
}
private ScanInfo preCompactScannerOpen(CompactionRequestImpl request, ScanType scanType,

View File

@ -22,7 +22,9 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.OptionalLong;
import org.apache.hadoop.conf.Configuration;
@ -198,8 +200,10 @@ public class DateTieredCompactionPolicy extends SortedCompactionPolicy {
public CompactionRequestImpl selectMajorCompaction(ArrayList<HStoreFile> candidateSelection) {
long now = EnvironmentEdgeManager.currentTime();
List<Long> boundaries = getCompactBoundariesForMajor(candidateSelection, now);
Map<Long, String> boundariesPolicies = getBoundariesStoragePolicyForMajor(boundaries, now);
return new DateTieredCompactionRequest(candidateSelection,
this.getCompactBoundariesForMajor(candidateSelection, now));
boundaries, boundariesPolicies);
}
/**
@ -253,7 +257,7 @@ public class DateTieredCompactionPolicy extends SortedCompactionPolicy {
LOG.debug("Processing files: " + fileList + " for window: " + window);
}
DateTieredCompactionRequest request = generateCompactionRequest(fileList, window,
mayUseOffPeak, mayBeStuck, minThreshold);
mayUseOffPeak, mayBeStuck, minThreshold, now);
if (request != null) {
return request;
}
@ -265,8 +269,8 @@ public class DateTieredCompactionPolicy extends SortedCompactionPolicy {
}
private DateTieredCompactionRequest generateCompactionRequest(ArrayList<HStoreFile> storeFiles,
CompactionWindow window, boolean mayUseOffPeak, boolean mayBeStuck, int minThreshold)
throws IOException {
CompactionWindow window, boolean mayUseOffPeak, boolean mayBeStuck, int minThreshold,
long now) throws IOException {
// The files has to be in ascending order for ratio-based compaction to work right
// and removeExcessFile to exclude youngest files.
Collections.reverse(storeFiles);
@ -281,8 +285,11 @@ public class DateTieredCompactionPolicy extends SortedCompactionPolicy {
boolean singleOutput = storeFiles.size() != storeFileSelection.size() ||
comConf.useDateTieredSingleOutputForMinorCompaction();
List<Long> boundaries = getCompactionBoundariesForMinor(window, singleOutput);
// we want to generate policy to boundaries for minor compaction
Map<Long, String> boundaryPolicyMap =
getBoundariesStoragePolicyForMinor(singleOutput, window, now);
DateTieredCompactionRequest result = new DateTieredCompactionRequest(storeFileSelection,
boundaries);
boundaries, boundaryPolicyMap);
return result;
}
return null;
@ -334,4 +341,39 @@ public class DateTieredCompactionPolicy extends SortedCompactionPolicy {
return Long.MIN_VALUE;
}
}
private Map<Long, String> getBoundariesStoragePolicyForMinor(boolean singleOutput,
CompactionWindow window, long now) {
Map<Long, String> boundariesPolicy = new HashMap<>();
if (!comConf.isDateTieredStoragePolicyEnable()) {
return boundariesPolicy;
}
String windowStoragePolicy = getWindowStoragePolicy(now, window.startMillis());
if (singleOutput) {
boundariesPolicy.put(Long.MIN_VALUE, windowStoragePolicy);
} else {
boundariesPolicy.put(window.startMillis(), windowStoragePolicy);
}
return boundariesPolicy;
}
private Map<Long, String> getBoundariesStoragePolicyForMajor(List<Long> boundaries, long now) {
Map<Long, String> boundariesPolicy = new HashMap<>();
if (!comConf.isDateTieredStoragePolicyEnable()) {
return boundariesPolicy;
}
for (Long startTs : boundaries) {
boundariesPolicy.put(startTs, getWindowStoragePolicy(now, startTs));
}
return boundariesPolicy;
}
private String getWindowStoragePolicy(long now, long windowStartMillis) {
if (windowStartMillis >= (now - comConf.getHotWindowAgeMillis())) {
return comConf.getHotWindowStoragePolicy();
} else if (windowStartMillis >= (now - comConf.getWarmWindowAgeMillis())) {
return comConf.getWarmWindowStoragePolicy();
}
return comConf.getColdWindowStoragePolicy();
}
}

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.regionserver.compactions;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.hbase.regionserver.HStoreFile;
import org.apache.yetus.audience.InterfaceAudience;
@ -28,18 +29,27 @@ import org.apache.yetus.audience.InterfaceAudience;
@InterfaceAudience.Private
public class DateTieredCompactionRequest extends CompactionRequestImpl {
private List<Long> boundaries;
/** window start boundary to window storage policy map **/
private Map<Long, String> boundariesPolicies;
public DateTieredCompactionRequest(Collection<HStoreFile> files, List<Long> boundaryList) {
public DateTieredCompactionRequest(Collection<HStoreFile> files, List<Long> boundaryList,
Map<Long, String> boundaryPolicyMap) {
super(files);
boundaries = boundaryList;
boundariesPolicies = boundaryPolicyMap;
}
public List<Long> getBoundaries() {
return boundaries;
}
public Map<Long, String> getBoundariesPolicies() {
return boundariesPolicies;
}
@Override
public String toString() {
return super.toString() + " boundaries=" + Arrays.toString(boundaries.toArray());
return super.toString() + " boundaries=" + Arrays.toString(boundaries.toArray())
+ " boundariesPolicies="+boundariesPolicies.toString();
}
}

View File

@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.regionserver.compactions;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.OptionalLong;
import org.apache.hadoop.conf.Configuration;
@ -55,6 +56,7 @@ public class DateTieredCompactor extends AbstractMultiOutputCompactor<DateTiered
}
public List<Path> compact(final CompactionRequestImpl request, final List<Long> lowerBoundaries,
final Map<Long, String> lowerBoundariesPolicies,
ThroughputController throughputController, User user) throws IOException {
if (LOG.isDebugEnabled()) {
LOG.debug("Executing compaction with " + lowerBoundaries.size()
@ -68,6 +70,7 @@ public class DateTieredCompactor extends AbstractMultiOutputCompactor<DateTiered
public DateTieredMultiFileWriter createWriter(InternalScanner scanner, FileDetails fd,
boolean shouldDropBehind) throws IOException {
DateTieredMultiFileWriter writer = new DateTieredMultiFileWriter(lowerBoundaries,
lowerBoundariesPolicies,
needEmptyFile(request));
initMultiWriter(writer, scanner, fd, shouldDropBehind);
return writer;

View File

@ -23,6 +23,7 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.hbase.regionserver.compactions.DateTieredCompactionPolicy;
import org.apache.hadoop.hbase.regionserver.compactions.DateTieredCompactionRequest;
@ -57,12 +58,21 @@ public class AbstractTestDateTieredCompactionPolicy extends TestCompactionPolicy
protected void compactEquals(long now, ArrayList<HStoreFile> candidates, long[] expectedFileSizes,
long[] expectedBoundaries, boolean isMajor, boolean toCompact) throws IOException {
DateTieredCompactionRequest request = getRequest(now, candidates, isMajor, toCompact);
List<HStoreFile> actual = Lists.newArrayList(request.getFiles());
assertEquals(Arrays.toString(expectedFileSizes), Arrays.toString(getSizes(actual)));
assertEquals(Arrays.toString(expectedBoundaries),
Arrays.toString(request.getBoundaries().toArray()));
}
private DateTieredCompactionRequest getRequest(long now, ArrayList<HStoreFile> candidates,
boolean isMajor, boolean toCompact) throws IOException {
ManualEnvironmentEdge timeMachine = new ManualEnvironmentEdge();
EnvironmentEdgeManager.injectEdge(timeMachine);
timeMachine.setValue(now);
DateTieredCompactionRequest request;
DateTieredCompactionPolicy policy =
(DateTieredCompactionPolicy) store.storeEngine.getCompactionPolicy();
(DateTieredCompactionPolicy) store.storeEngine.getCompactionPolicy();
if (isMajor) {
for (HStoreFile file : candidates) {
((MockHStoreFile) file).setIsMajor(true);
@ -72,11 +82,18 @@ public class AbstractTestDateTieredCompactionPolicy extends TestCompactionPolicy
} else {
assertEquals(toCompact, policy.needsCompaction(candidates, ImmutableList.of()));
request =
(DateTieredCompactionRequest) policy.selectMinorCompaction(candidates, false, false);
(DateTieredCompactionRequest) policy.selectMinorCompaction(candidates, false, false);
}
return request;
}
protected void compactEqualsStoragePolicy(long now, ArrayList<HStoreFile> candidates,
Map<Long, String> expectedBoundariesPolicies, boolean isMajor, boolean toCompact)
throws IOException {
DateTieredCompactionRequest request = getRequest(now, candidates, isMajor, toCompact);
Map<Long, String> boundariesPolicies = request.getBoundariesPolicies();
for (Map.Entry<Long, String> entry : expectedBoundariesPolicies.entrySet()) {
assertEquals(entry.getValue(), boundariesPolicies.get(entry.getKey()));
}
List<HStoreFile> actual = Lists.newArrayList(request.getFiles());
assertEquals(Arrays.toString(expectedFileSizes), Arrays.toString(getSizes(actual)));
assertEquals(Arrays.toString(expectedBoundaries),
Arrays.toString(request.getBoundaries().toArray()));
}
}

View File

@ -0,0 +1,189 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration;
import org.apache.hadoop.hbase.regionserver.compactions.ExponentialCompactionWindowFactory;
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@Category({ RegionServerTests.class, SmallTests.class })
public class TestDateTieredCompactionPolicyHeterogeneousStorage
extends AbstractTestDateTieredCompactionPolicy {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestDateTieredCompactionPolicyHeterogeneousStorage.class);
public static final String HOT_WINDOW_SP = "ALL_SSD";
public static final String WARM_WINDOW_SP = "ONE_SSD";
public static final String COLD_WINDOW_SP = "HOT";
@Override
protected void config() {
super.config();
// Set up policy
conf.set(StoreEngine.STORE_ENGINE_CLASS_KEY,
"org.apache.hadoop.hbase.regionserver.DateTieredStoreEngine");
conf.setLong(CompactionConfiguration.DATE_TIERED_MAX_AGE_MILLIS_KEY, 100);
conf.setLong(CompactionConfiguration.DATE_TIERED_INCOMING_WINDOW_MIN_KEY, 3);
conf.setLong(ExponentialCompactionWindowFactory.BASE_WINDOW_MILLIS_KEY, 6);
conf.setInt(ExponentialCompactionWindowFactory.WINDOWS_PER_TIER_KEY, 4);
conf.setBoolean(CompactionConfiguration.DATE_TIERED_SINGLE_OUTPUT_FOR_MINOR_COMPACTION_KEY,
false);
// Special settings for compaction policy per window
this.conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MIN_KEY, 2);
this.conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MAX_KEY, 12);
this.conf.setFloat(CompactionConfiguration.HBASE_HSTORE_COMPACTION_RATIO_KEY, 1.2F);
conf.setInt(HStore.BLOCKING_STOREFILES_KEY, 20);
conf.setLong(HConstants.MAJOR_COMPACTION_PERIOD, 5);
// Set Storage Policy for different type window
conf.setBoolean(CompactionConfiguration.DATE_TIERED_STORAGE_POLICY_ENABLE_KEY, true);
conf.setLong(CompactionConfiguration.DATE_TIERED_HOT_WINDOW_AGE_MILLIS_KEY, 6);
conf.set(CompactionConfiguration.DATE_TIERED_HOT_WINDOW_STORAGE_POLICY_KEY, HOT_WINDOW_SP);
conf.setLong(CompactionConfiguration.DATE_TIERED_WARM_WINDOW_AGE_MILLIS_KEY, 12);
conf.set(CompactionConfiguration.DATE_TIERED_WARM_WINDOW_STORAGE_POLICY_KEY, WARM_WINDOW_SP);
conf.set(CompactionConfiguration.DATE_TIERED_COLD_WINDOW_STORAGE_POLICY_KEY, COLD_WINDOW_SP);
}
/**
* Test for minor compaction of incoming window.
* Incoming window start ts >= now - hot age. So it is HOT window, will use HOT_WINDOW_SP.
* @throws IOException with error
*/
@Test
public void testIncomingWindowHot() throws IOException {
long[] minTimestamps = new long[] { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 };
long[] maxTimestamps = new long[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15 };
long[] sizes = new long[] { 30, 31, 32, 33, 34, 20, 21, 22, 23, 24, 25, 10, 11, 12, 13 };
Map<Long, String> expected = new HashMap<>();
// expected DateTieredCompactionRequest boundaries = { Long.MIN_VALUE, 12 }
// test whether DateTieredCompactionRequest boundariesPolicies matches expected
expected.put(12L, HOT_WINDOW_SP);
compactEqualsStoragePolicy(16, sfCreate(minTimestamps, maxTimestamps, sizes),
expected, false, true);
}
/**
* Test for not incoming window.
* now - hot age > window start >= now - warm age,
* so this window and is WARM window, will use WARM_WINDOW_SP
* @throws IOException with error
*/
@Test
public void testNotIncomingWindowWarm() throws IOException {
long[] minTimestamps = new long[] { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 };
long[] maxTimestamps = new long[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13 };
long[] sizes = new long[] { 30, 31, 32, 33, 34, 20, 21, 22, 23, 24, 25, 10, 11 };
Map<Long, String> expected = new HashMap<>();
// expected DateTieredCompactionRequest boundaries = { Long.MIN_VALUE, 6 }
expected.put(6L, WARM_WINDOW_SP);
compactEqualsStoragePolicy(16, sfCreate(minTimestamps, maxTimestamps, sizes),
expected, false, true);
}
/**
* Test for not incoming window.
* this window start ts >= ow - hot age,
* So this incoming window and is HOT window. Use HOT_WINDOW_SP
* @throws IOException with error
*/
@Test
public void testNotIncomingWindowAndIsHot() throws IOException {
long[] minTimestamps = new long[] { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 };
long[] maxTimestamps = new long[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13 };
long[] sizes = new long[] { 30, 31, 32, 33, 34, 20, 21, 22, 23, 24, 25, 10, 11 };
Map<Long, String> expected = new HashMap<>();
// expected DateTieredCompactionRequest boundaries = { Long.MIN_VALUE, 6 }
expected.put(6L, HOT_WINDOW_SP);
compactEqualsStoragePolicy(12, sfCreate(minTimestamps, maxTimestamps, sizes),
expected, false, true);
}
/**
* Test for not incoming window.
* COLD window start timestamp < now - warm age, so use COLD_WINDOW_SP
* @throws IOException with error
*/
@Test
public void testColdWindow() throws IOException {
long[] minTimestamps = new long[] { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 };
long[] maxTimestamps = new long[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12 };
long[] sizes = new long[] { 30, 31, 32, 33, 34, 20, 21, 22, 23, 24, 25, 10 };
Map<Long, String> expected = new HashMap<>();
// expected DateTieredCompactionRequest boundaries = { Long.MIN_VALUE, 6 }
expected.put(6L, COLD_WINDOW_SP);
compactEqualsStoragePolicy(22, sfCreate(minTimestamps, maxTimestamps, sizes),
expected, false, true);
}
/**
* Test for not incoming window. but not all hfiles will be selected to compact.
* Apply exploring logic on non-incoming window. More than one hfile left in this window.
* this means minor compact single out is true. boundaries only contains Long.MIN_VALUE
* @throws IOException with error
*/
@Test
public void testRatioT0() throws IOException {
long[] minTimestamps = new long[] { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 };
long[] maxTimestamps = new long[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12 };
long[] sizes = new long[] { 30, 31, 32, 33, 34, 20, 21, 22, 280, 23, 24, 1 };
Map<Long, String> expected = new HashMap<>();
// window start = 6, expected DateTieredCompactionRequest boundaries = { Long.MIN_VALUE }
expected.put(Long.MIN_VALUE, WARM_WINDOW_SP);
compactEqualsStoragePolicy(16, sfCreate(minTimestamps, maxTimestamps, sizes),
expected, false, true);
}
/**
* Test for Major compaction. It will compact all files and create multi output files
* with different window storage policy.
* @throws IOException with error
*/
@Test
public void testMajorCompation() throws IOException {
long[] minTimestamps = new long[] { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 };
long[] maxTimestamps = new long[] { 44, 60, 61, 96, 100, 104, 105, 106, 113, 145, 157 };
long[] sizes = new long[] { 0, 50, 51, 40, 41, 42, 33, 30, 31, 2, 1 };
Map<Long, String> expected = new HashMap<>();
expected.put(Long.MIN_VALUE, COLD_WINDOW_SP);
expected.put(24L, COLD_WINDOW_SP);
expected.put(48L, COLD_WINDOW_SP);
expected.put(72L, COLD_WINDOW_SP);
expected.put(96L, COLD_WINDOW_SP);
expected.put(120L, COLD_WINDOW_SP);
expected.put(144L, COLD_WINDOW_SP);
expected.put(150L, WARM_WINDOW_SP);
expected.put(156L, HOT_WINDOW_SP);
compactEquals(161, sfCreate(minTimestamps, maxTimestamps, sizes),
new long[] { 0, 50, 51, 40, 41, 42, 33, 30, 31, 2, 1 },
new long[] { Long.MIN_VALUE, 24, 48, 72, 96, 120, 144, 150, 156 }, true, true);
compactEqualsStoragePolicy(161, sfCreate(minTimestamps, maxTimestamps, sizes),
expected,true, true);
}
}

View File

@ -24,12 +24,14 @@ import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.OptionalLong;
import org.apache.hadoop.conf.Configuration;
@ -111,7 +113,7 @@ public class TestDateTieredCompactor {
when(store.createWriterInTmp(anyLong(), any(), anyBoolean(),
anyBoolean(), anyBoolean(), anyBoolean())).thenAnswer(writers);
when(store.createWriterInTmp(anyLong(), any(), anyBoolean(),
anyBoolean(), anyBoolean(), anyBoolean(), anyLong())).thenAnswer(writers);
anyBoolean(), anyBoolean(), anyBoolean(), anyLong(), anyString())).thenAnswer(writers);
when(store.getComparator()).thenReturn(CellComparatorImpl.COMPARATOR);
OptionalLong maxSequenceId = StoreUtils.getMaxSequenceIdInList(storefiles);
when(store.getMaxSequenceId()).thenReturn(maxSequenceId);
@ -140,7 +142,8 @@ public class TestDateTieredCompactor {
HStoreFile sf2 = createDummyStoreFile(2L);
DateTieredCompactor dtc = createCompactor(writers, input, Arrays.asList(sf1, sf2));
List<Path> paths = dtc.compact(new CompactionRequestImpl(Arrays.asList(sf1)),
boundaries.subList(0, boundaries.size() - 1), NoLimitThroughputController.INSTANCE, null);
boundaries.subList(0, boundaries.size() - 1), new HashMap<Long, String>(),
NoLimitThroughputController.INSTANCE, null);
writers.verifyKvs(output, allFiles, boundaries);
if (allFiles) {
assertEquals(output.length, paths.size());
@ -169,7 +172,7 @@ public class TestDateTieredCompactor {
DateTieredCompactor dtc = createCompactor(writers, new KeyValue[0],
new ArrayList<>(request.getFiles()));
List<Path> paths = dtc.compact(request, Arrays.asList(Long.MIN_VALUE, Long.MAX_VALUE),
NoLimitThroughputController.INSTANCE, null);
new HashMap<Long, String>(), NoLimitThroughputController.INSTANCE, null);
assertEquals(1, paths.size());
List<StoreFileWritersCapture.Writer> dummyWriters = writers.getWriters();
assertEquals(1, dummyWriters.size());

View File

@ -27,6 +27,7 @@ import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.argThat;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.isNull;
@ -800,7 +801,7 @@ public class TestStripeCompactionPolicy {
anyBoolean(), anyBoolean(), anyBoolean())).thenAnswer(writers);
when(
store.createWriterInTmp(anyLong(), any(), anyBoolean(),
anyBoolean(), anyBoolean(), anyBoolean(), anyLong())).thenAnswer(writers);
anyBoolean(), anyBoolean(), anyBoolean(), anyLong(), anyString())).thenAnswer(writers);
Configuration conf = HBaseConfiguration.create();
conf.setBoolean("hbase.regionserver.compaction.private.readers", usePrivateReaders);

View File

@ -23,6 +23,7 @@ import static org.junit.Assert.assertEquals;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@ -211,7 +212,7 @@ public class TestStripeCompactor {
when(store.createWriterInTmp(anyLong(), any(), anyBoolean(),
anyBoolean(), anyBoolean(), anyBoolean())).thenAnswer(writers);
when(store.createWriterInTmp(anyLong(), any(), anyBoolean(),
anyBoolean(), anyBoolean(), anyBoolean(), anyLong())).thenAnswer(writers);
anyBoolean(), anyBoolean(), anyBoolean(), anyLong(), anyString())).thenAnswer(writers);
when(store.getComparator()).thenReturn(CellComparatorImpl.COMPARATOR);
return new StripeCompactor(conf, store) {