diff --git a/dev-support/design-docs/HBASE-24289-Heterogeneous Storage for Date Tiered Compaction.md b/dev-support/design-docs/HBASE-24289-Heterogeneous Storage for Date Tiered Compaction.md new file mode 100644 index 00000000000..1a344554334 --- /dev/null +++ b/dev-support/design-docs/HBASE-24289-Heterogeneous Storage for Date Tiered Compaction.md @@ -0,0 +1,125 @@ + + +# Heterogeneous Storage for Date Tiered Compaction + +## Objective + +Support DateTiredCompaction([HBASE-15181](https://issues.apache.org/jira/browse/HBASE-15181)) + for cold and hot data separation, support different storage policies for different time periods + of data to get better performance, for example, we can configure the data of last 1 month in SSD, + and 1 month ago data was in HDD. + ++ Date Tiered Compaction (DTCP) is based on date tiering (date-aware), we hope to support + the separation of cold and hot data, heterogeneous storage. Set different storage + policies (in HDFS) for data in different time windows. ++ DTCP designs different windows, and we can classify the windows according to + the timestamps of the windows. For example: HOT window, WARM window, COLD window. ++ DTCP divides storefiles into different windows, and performs minor Compaction within + a time window. The storefile generated by Compaction will use the storage strategy of + this window. For example, if a window is a HOT window, the storefile generated by compaction + can be stored on the SSD. There are already WAL and the entire CF support storage policy + (HBASE-12848, HBASE-14061), our goal is to achieve cold and hot separation in one CF or + a region, using different storage policies. + +## Definition of hot and cold data + +Usually the data of the last 3 days can be defined as `HOT data`, hot age = 3 days. + If the written timestamp of the data(Cell) is > (timestamp now - hot age), we think the data is hot data. + Warm age can be defined in the same way. Only one type of data is allowed. + If data timestamp < (now - warm age), we consider it is COLD. + ``` + if timestamp >= (now - hot age) , HOT data + else if timestamp >= (now - warm age), WARM data + else COLD data +``` + +## Time window +When given a time now, it is the time when the compaction occurs. Each window and the size of + the window are automatically calculated by DTCP, and the window boundary is rounded according + to the base size. +Assuming that the base window size is 1 hour, and each tier has 3 windows, the current time is + between 12:00 and 13:00. We have defined three types of winow (`HOT, WARM, COLD`). The type of + winodw is determined by the timestamp at the beginning of the window and the timestamp now. +As shown in the figure 1 below, the type of each window can be determined by the age range + (hot / warm / cold) where (now - window.startTimestamp) falls. Cold age can not need to be set, + the default Long.MAX, meaning that the window with a very early time stamp belongs to the + cold window. +![figure 1](https://raw.githubusercontent.com/pengmq1/images/master/F1-HDTCP.png "figure 1") + +## Example configuration + +| Configuration Key | value | Note | +|:---|:---:|:---| +|hbase.hstore.compaction.date.tiered.storage.policy.enable|true|if or not use storage policy for window. Default is false| +|hbase.hstore.compaction.date.tiered.hot.window.age.millis|3600000|hot data age +|hbase.hstore.compaction.date.tiered.hot.window.storage.policy|ALL_SSD|hot data storage policy, Corresponding HDFS storage policy +|hbase.hstore.compaction.date.tiered.warm.window.age.millis|20600000|| +|hbase.hstore.compaction.date.tiered.warm.window.storage.policy|ONE_SSD|| +|hbase.hstore.compaction.date.tiered.cold.window.storage.policy|HOT|| + +The original date tiered compaction related configuration has the same meaning and maintains + compatibility. +If `hbase.hstore.compaction.date.tiered.storage.policy.enable = false`. DTCP still follows the + original logic and has not changed. + +## Storage strategy +HDFS provides the following storage policies, you can refer to + https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-hdfs/ArchivalStorage.html + +|Policy ID | Policy Name | Block Placement (3 replicas)| +|:---|:---|:---| +|15|Lasy_Persist|RAM_DISK: 1, DISK: 2| +|12|All_SSD|SSD: 3| +|10|One_SSD|SSD: 1, DISK: 2| +|7|Hot (default)|DISK: 3| +|5|Warm|DISK: 1, ARCHIVE: 2| +|2|Cold|ARCHIVE: 3| + +Date Tiered Compaction (DTCP) supports the output of multiple storefiles. We hope that these + storefiles can be set with different storage policies (in HDFS). + Therefore, through DateTieredMultiFileWriter to generate different StoreFileWriters with + storage policy to achieve the purpose. + +## Why use different child tmp dir +Before StoreFileWriter writes a storefile, we can create different dirs in the tmp directory + of the region and set the corresponding storage policy for these dirs. This way + StoreFileWriter can write files to different dirs. + +Since **HDFS** does not support the create file with the storage policy parameter + (See https://issues.apache.org/jira/browse/HDFS-13209 and now not support on hadoop 2.x), + and HDFS cannot set a storage policy for a file / dir path that does not yet exist. + When the compaction ends, the storefile path must exist at this time, and I set the + storage policy to Storefile. + +But, in HDFS, when the file is written first, and then the storage policy is set. + The actual storage location of the data does not match the storage policy. For example, + write three copies of a file (1 block) in the HDD, then set storage policy is ALL_SSD, + but the data block will not be moved to the SSD immediately. + “HDFS wont move the file content across different block volumes on rename”. Data movement + requires the HDFS mover tool, or use HDFS SPS + (for details, see https://issues.apache.org/jira/browse/HDFS-10285), so in order to + avoid moving data blocks at the HDFS level, we can set the file parent directory to + the storage policy we need before writing data. The new file automatically inherits the + storage policy of the parent directory, and is written according to the correct disk + type when writing. So as to avoid later data movement. + +Over time, the original HOT data will become WARM / COLD and no longer belong to the + HOT window. When the compaction occurs again, the data will be automatically downgraded, + such as from SSD to HDD. The compaction mechanism will generate a new file (write into HDD) + and delete it Old file (SSD). diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java index d797d74370b..8104828952e 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java @@ -64,6 +64,11 @@ public final class HConstants { */ public static final String RECOVERED_HFILES_DIR = "recovered.hfiles"; + /** + * Date Tiered Compaction tmp dir prefix name if use storage policy + */ + public static final String STORAGE_POLICY_PREFIX = "storage_policy_"; + /** * The first four bytes of Hadoop RPC connections */ diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMultiFileWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMultiFileWriter.java index f9cc40067d0..f250304952a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMultiFileWriter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMultiFileWriter.java @@ -43,6 +43,10 @@ public abstract class AbstractMultiFileWriter implements CellSink, ShipperListen public interface WriterFactory { public StoreFileWriter createWriter() throws IOException; + default StoreFileWriter createWriterWithStoragePolicy(String fileStoragePolicy) + throws IOException { + return createWriter(); + }; } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DateTieredMultiFileWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DateTieredMultiFileWriter.java index a13e4e7cce3..8201cb152c0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DateTieredMultiFileWriter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DateTieredMultiFileWriter.java @@ -38,15 +38,20 @@ public class DateTieredMultiFileWriter extends AbstractMultiFileWriter { private final boolean needEmptyFile; + private final Map lowerBoundariesPolicies; + /** + * @param lowerBoundariesPolicies each window to storage policy map. * @param needEmptyFile whether need to create an empty store file if we haven't written out * anything. */ - public DateTieredMultiFileWriter(List lowerBoundaries, boolean needEmptyFile) { + public DateTieredMultiFileWriter(List lowerBoundaries, + Map lowerBoundariesPolicies, boolean needEmptyFile) { for (Long lowerBoundary : lowerBoundaries) { lowerBoundary2Writer.put(lowerBoundary, null); } this.needEmptyFile = needEmptyFile; + this.lowerBoundariesPolicies = lowerBoundariesPolicies; } @Override @@ -54,7 +59,12 @@ public class DateTieredMultiFileWriter extends AbstractMultiFileWriter { Map.Entry entry = lowerBoundary2Writer.floorEntry(cell.getTimestamp()); StoreFileWriter writer = entry.getValue(); if (writer == null) { - writer = writerFactory.createWriter(); + String lowerBoundaryStoragePolicy = lowerBoundariesPolicies.get(entry.getKey()); + if (lowerBoundaryStoragePolicy != null) { + writer = writerFactory.createWriterWithStoragePolicy(lowerBoundaryStoragePolicy); + } else { + writer = writerFactory.createWriter(); + } lowerBoundary2Writer.put(entry.getKey(), writer); } writer.append(cell); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DateTieredStoreEngine.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DateTieredStoreEngine.java index daae0832fae..1df953d93c9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DateTieredStoreEngine.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DateTieredStoreEngine.java @@ -93,7 +93,9 @@ public class DateTieredStoreEngine extends StoreEngine compact(ThroughputController throughputController, User user) throws IOException { if (request instanceof DateTieredCompactionRequest) { - return compactor.compact(request, ((DateTieredCompactionRequest) request).getBoundaries(), + DateTieredCompactionRequest compactionRequest = (DateTieredCompactionRequest) request; + return compactor.compact(request, compactionRequest.getBoundaries(), + compactionRequest.getBoundariesPolicies(), throughputController, user); } else { throw new IllegalArgumentException("DateTieredCompactionRequest is expected. Actual: " diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java index f9b7e1c0069..8116507230c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java @@ -101,6 +101,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.CompactionDes import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.ChecksumType; import org.apache.hadoop.hbase.util.ClassSize; +import org.apache.hadoop.hbase.util.CommonFSUtils; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.ReflectionUtils; @@ -1148,7 +1149,7 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, boolean isCompaction, boolean includeMVCCReadpoint, boolean includesTag, boolean shouldDropBehind) throws IOException { return createWriterInTmp(maxKeyCount, compression, isCompaction, includeMVCCReadpoint, - includesTag, shouldDropBehind, -1); + includesTag, shouldDropBehind, -1, HConstants.EMPTY_STRING); } /** @@ -1162,7 +1163,8 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, // compaction public StoreFileWriter createWriterInTmp(long maxKeyCount, Compression.Algorithm compression, boolean isCompaction, boolean includeMVCCReadpoint, boolean includesTag, - boolean shouldDropBehind, long totalCompactedFilesSize) throws IOException { + boolean shouldDropBehind, long totalCompactedFilesSize, String fileStoragePolicy) + throws IOException { // creating new cache config for each new writer final CacheConfig writerCacheConf = new CacheConfig(cacheConf); if (isCompaction) { @@ -1219,7 +1221,8 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, .withFavoredNodes(favoredNodes) .withFileContext(hFileContext) .withShouldDropCacheBehind(shouldDropBehind) - .withCompactedFilesSupplier(this::getCompactedFiles); + .withCompactedFilesSupplier(this::getCompactedFiles) + .withFileStoragePolicy(fileStoragePolicy); return builder.build(); } @@ -1540,6 +1543,7 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Collection filesToCompact, User user, long compactionStartTime, List newFiles) throws IOException { // Do the steps necessary to complete the compaction. + setStoragePolicyFromFileName(newFiles); List sfs = moveCompactedFilesIntoPlace(cr, newFiles, user); writeCompactionWalRecord(filesToCompact, sfs); replaceStoreFiles(filesToCompact, sfs); @@ -1569,6 +1573,18 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, return sfs; } + // Set correct storage policy from the file name of DTCP. + // Rename file will not change the storage policy. + private void setStoragePolicyFromFileName(List newFiles) throws IOException { + String prefix = HConstants.STORAGE_POLICY_PREFIX; + for (Path newFile : newFiles) { + if (newFile.getParent().getName().startsWith(prefix)) { + CommonFSUtils.setStoragePolicy(fs.getFileSystem(), newFile, + newFile.getParent().getName().substring(prefix.length())); + } + } + } + private List moveCompactedFilesIntoPlace(CompactionRequestImpl cr, List newFiles, User user) throws IOException { List sfs = new ArrayList<>(newFiles.size()); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileWriter.java index 871aa2705f6..da67c943292 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileWriter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileWriter.java @@ -66,6 +66,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; +import org.apache.hbase.thirdparty.com.google.common.base.Strings; import org.apache.hbase.thirdparty.com.google.common.collect.SetMultimap; import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; @@ -437,6 +438,7 @@ public class StoreFileWriter implements CellSink, ShipperListener { private HFileContext fileContext; private boolean shouldDropCacheBehind; private Supplier> compactedFilesSupplier = () -> Collections.emptySet(); + private String fileStoragePolicy; public Builder(Configuration conf, CacheConfig cacheConf, FileSystem fs) { @@ -518,6 +520,11 @@ public class StoreFileWriter implements CellSink, ShipperListener { return this; } + public Builder withFileStoragePolicy(String fileStoragePolicy) { + this.fileStoragePolicy = fileStoragePolicy; + return this; + } + /** * Create a store file writer. Client is responsible for closing file when * done. If metadata, add BEFORE closing using @@ -547,6 +554,20 @@ public class StoreFileWriter implements CellSink, ShipperListener { CommonFSUtils.setStoragePolicy(this.fs, dir, policyName); if (filePath == null) { + // The stored file and related blocks will used the directory based StoragePolicy. + // Because HDFS DistributedFileSystem does not support create files with storage policy + // before version 3.3.0 (See HDFS-13209). Use child dir here is to make stored files + // satisfy the specific storage policy when writing. So as to avoid later data movement. + // We don't want to change whole temp dir to 'fileStoragePolicy'. + if (!Strings.isNullOrEmpty(fileStoragePolicy)) { + dir = new Path(dir, HConstants.STORAGE_POLICY_PREFIX + fileStoragePolicy); + if (!fs.exists(dir)) { + HRegionFileSystem.mkdirs(fs, conf, dir); + LOG.info( + "Create tmp dir " + dir.toString() + " with storage policy: " + fileStoragePolicy); + } + CommonFSUtils.setStoragePolicy(this.fs, dir, fileStoragePolicy); + } filePath = getUniqueFile(fs, dir); if (!BloomFilterFactory.isGeneralBloomEnabled(conf)) { bloomType = BloomType.NONE; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/AbstractMultiOutputCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/AbstractMultiOutputCompactor.java index a8ffc2e4ea7..f2816d89686 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/AbstractMultiOutputCompactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/AbstractMultiOutputCompactor.java @@ -53,6 +53,12 @@ public abstract class AbstractMultiOutputCompactor DEFAULT_DATE_TIERED_COMPACTION_WINDOW_FACTORY_CLASS = ExponentialCompactionWindowFactory.class; + public static final String DATE_TIERED_STORAGE_POLICY_ENABLE_KEY = + "hbase.hstore.compaction.date.tiered.storage.policy.enable"; + public static final String DATE_TIERED_HOT_WINDOW_AGE_MILLIS_KEY = + "hbase.hstore.compaction.date.tiered.hot.window.age.millis"; + public static final String DATE_TIERED_HOT_WINDOW_STORAGE_POLICY_KEY = + "hbase.hstore.compaction.date.tiered.hot.window.storage.policy"; + public static final String DATE_TIERED_WARM_WINDOW_AGE_MILLIS_KEY = + "hbase.hstore.compaction.date.tiered.warm.window.age.millis"; + public static final String DATE_TIERED_WARM_WINDOW_STORAGE_POLICY_KEY = + "hbase.hstore.compaction.date.tiered.warm.window.storage.policy"; + /** Windows older than warm age belong to COLD_WINDOW **/ + public static final String DATE_TIERED_COLD_WINDOW_STORAGE_POLICY_KEY = + "hbase.hstore.compaction.date.tiered.cold.window.storage.policy"; + Configuration conf; StoreConfigInformation storeConfigInfo; @@ -113,6 +127,12 @@ public class CompactionConfiguration { private final String compactionPolicyForDateTieredWindow; private final boolean dateTieredSingleOutputForMinorCompaction; private final String dateTieredCompactionWindowFactory; + private final boolean dateTieredStoragePolicyEnable; + private long hotWindowAgeMillis; + private long warmWindowAgeMillis; + private String hotWindowStoragePolicy; + private String warmWindowStoragePolicy; + private String coldWindowStoragePolicy; CompactionConfiguration(Configuration conf, StoreConfigInformation storeConfigInfo) { this.conf = conf; @@ -147,6 +167,13 @@ public class CompactionConfiguration { this.dateTieredCompactionWindowFactory = conf.get( DATE_TIERED_COMPACTION_WINDOW_FACTORY_CLASS_KEY, DEFAULT_DATE_TIERED_COMPACTION_WINDOW_FACTORY_CLASS.getName()); + // for Heterogeneous Storage + dateTieredStoragePolicyEnable = conf.getBoolean(DATE_TIERED_STORAGE_POLICY_ENABLE_KEY, false); + hotWindowAgeMillis = conf.getLong(DATE_TIERED_HOT_WINDOW_AGE_MILLIS_KEY, 86400000L); + hotWindowStoragePolicy = conf.get(DATE_TIERED_HOT_WINDOW_STORAGE_POLICY_KEY, "ALL_SSD"); + warmWindowAgeMillis = conf.getLong(DATE_TIERED_WARM_WINDOW_AGE_MILLIS_KEY, 604800000L); + warmWindowStoragePolicy = conf.get(DATE_TIERED_WARM_WINDOW_STORAGE_POLICY_KEY, "ONE_SSD"); + coldWindowStoragePolicy = conf.get(DATE_TIERED_COLD_WINDOW_STORAGE_POLICY_KEY, "HOT"); LOG.info(toString()); } @@ -293,4 +320,28 @@ public class CompactionConfiguration { public String getDateTieredCompactionWindowFactory() { return dateTieredCompactionWindowFactory; } + + public boolean isDateTieredStoragePolicyEnable() { + return dateTieredStoragePolicyEnable; + } + + public long getHotWindowAgeMillis() { + return hotWindowAgeMillis; + } + + public long getWarmWindowAgeMillis() { + return warmWindowAgeMillis; + } + + public String getHotWindowStoragePolicy() { + return hotWindowStoragePolicy.trim().toUpperCase(); + } + + public String getWarmWindowStoragePolicy() { + return warmWindowStoragePolicy.trim().toUpperCase(); + } + + public String getColdWindowStoragePolicy() { + return coldWindowStoragePolicy.trim().toUpperCase(); + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java index bc8d9432da3..250825b23b2 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java @@ -268,7 +268,15 @@ public abstract class Compactor { // See HBASE-8166, HBASE-12600, and HBASE-13389. return store .createWriterInTmp(fd.maxKeyCount, this.compactionCompression, true, fd.maxMVCCReadpoint > 0, - fd.maxTagsLength > 0, shouldDropBehind, fd.totalCompactedFilesSize); + fd.maxTagsLength > 0, shouldDropBehind, fd.totalCompactedFilesSize, + HConstants.EMPTY_STRING); + } + + protected final StoreFileWriter createTmpWriter(FileDetails fd, boolean shouldDropBehind, + String fileStoragePolicy) throws IOException { + return store + .createWriterInTmp(fd.maxKeyCount, this.compactionCompression, true, fd.maxMVCCReadpoint > 0, + fd.maxTagsLength > 0, shouldDropBehind, fd.totalCompactedFilesSize, fileStoragePolicy); } private ScanInfo preCompactScannerOpen(CompactionRequestImpl request, ScanType scanType, diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DateTieredCompactionPolicy.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DateTieredCompactionPolicy.java index 58969bfb246..1cc7dda0948 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DateTieredCompactionPolicy.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DateTieredCompactionPolicy.java @@ -22,7 +22,9 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.OptionalLong; import org.apache.hadoop.conf.Configuration; @@ -198,8 +200,10 @@ public class DateTieredCompactionPolicy extends SortedCompactionPolicy { public CompactionRequestImpl selectMajorCompaction(ArrayList candidateSelection) { long now = EnvironmentEdgeManager.currentTime(); + List boundaries = getCompactBoundariesForMajor(candidateSelection, now); + Map boundariesPolicies = getBoundariesStoragePolicyForMajor(boundaries, now); return new DateTieredCompactionRequest(candidateSelection, - this.getCompactBoundariesForMajor(candidateSelection, now)); + boundaries, boundariesPolicies); } /** @@ -253,7 +257,7 @@ public class DateTieredCompactionPolicy extends SortedCompactionPolicy { LOG.debug("Processing files: " + fileList + " for window: " + window); } DateTieredCompactionRequest request = generateCompactionRequest(fileList, window, - mayUseOffPeak, mayBeStuck, minThreshold); + mayUseOffPeak, mayBeStuck, minThreshold, now); if (request != null) { return request; } @@ -265,8 +269,8 @@ public class DateTieredCompactionPolicy extends SortedCompactionPolicy { } private DateTieredCompactionRequest generateCompactionRequest(ArrayList storeFiles, - CompactionWindow window, boolean mayUseOffPeak, boolean mayBeStuck, int minThreshold) - throws IOException { + CompactionWindow window, boolean mayUseOffPeak, boolean mayBeStuck, int minThreshold, + long now) throws IOException { // The files has to be in ascending order for ratio-based compaction to work right // and removeExcessFile to exclude youngest files. Collections.reverse(storeFiles); @@ -281,8 +285,11 @@ public class DateTieredCompactionPolicy extends SortedCompactionPolicy { boolean singleOutput = storeFiles.size() != storeFileSelection.size() || comConf.useDateTieredSingleOutputForMinorCompaction(); List boundaries = getCompactionBoundariesForMinor(window, singleOutput); + // we want to generate policy to boundaries for minor compaction + Map boundaryPolicyMap = + getBoundariesStoragePolicyForMinor(singleOutput, window, now); DateTieredCompactionRequest result = new DateTieredCompactionRequest(storeFileSelection, - boundaries); + boundaries, boundaryPolicyMap); return result; } return null; @@ -334,4 +341,39 @@ public class DateTieredCompactionPolicy extends SortedCompactionPolicy { return Long.MIN_VALUE; } } + + private Map getBoundariesStoragePolicyForMinor(boolean singleOutput, + CompactionWindow window, long now) { + Map boundariesPolicy = new HashMap<>(); + if (!comConf.isDateTieredStoragePolicyEnable()) { + return boundariesPolicy; + } + String windowStoragePolicy = getWindowStoragePolicy(now, window.startMillis()); + if (singleOutput) { + boundariesPolicy.put(Long.MIN_VALUE, windowStoragePolicy); + } else { + boundariesPolicy.put(window.startMillis(), windowStoragePolicy); + } + return boundariesPolicy; + } + + private Map getBoundariesStoragePolicyForMajor(List boundaries, long now) { + Map boundariesPolicy = new HashMap<>(); + if (!comConf.isDateTieredStoragePolicyEnable()) { + return boundariesPolicy; + } + for (Long startTs : boundaries) { + boundariesPolicy.put(startTs, getWindowStoragePolicy(now, startTs)); + } + return boundariesPolicy; + } + + private String getWindowStoragePolicy(long now, long windowStartMillis) { + if (windowStartMillis >= (now - comConf.getHotWindowAgeMillis())) { + return comConf.getHotWindowStoragePolicy(); + } else if (windowStartMillis >= (now - comConf.getWarmWindowAgeMillis())) { + return comConf.getWarmWindowStoragePolicy(); + } + return comConf.getColdWindowStoragePolicy(); + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DateTieredCompactionRequest.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DateTieredCompactionRequest.java index 37b7059cc2a..ddf9a0ce2ef 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DateTieredCompactionRequest.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DateTieredCompactionRequest.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.regionserver.compactions; import java.util.Arrays; import java.util.Collection; import java.util.List; +import java.util.Map; import org.apache.hadoop.hbase.regionserver.HStoreFile; import org.apache.yetus.audience.InterfaceAudience; @@ -28,18 +29,27 @@ import org.apache.yetus.audience.InterfaceAudience; @InterfaceAudience.Private public class DateTieredCompactionRequest extends CompactionRequestImpl { private List boundaries; + /** window start boundary to window storage policy map **/ + private Map boundariesPolicies; - public DateTieredCompactionRequest(Collection files, List boundaryList) { + public DateTieredCompactionRequest(Collection files, List boundaryList, + Map boundaryPolicyMap) { super(files); boundaries = boundaryList; + boundariesPolicies = boundaryPolicyMap; } public List getBoundaries() { return boundaries; } + public Map getBoundariesPolicies() { + return boundariesPolicies; + } + @Override public String toString() { - return super.toString() + " boundaries=" + Arrays.toString(boundaries.toArray()); + return super.toString() + " boundaries=" + Arrays.toString(boundaries.toArray()) + + " boundariesPolicies="+boundariesPolicies.toString(); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DateTieredCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DateTieredCompactor.java index 1bf52365e76..ef64df1efe2 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DateTieredCompactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DateTieredCompactor.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.regionserver.compactions; import java.io.IOException; import java.util.List; +import java.util.Map; import java.util.OptionalLong; import org.apache.hadoop.conf.Configuration; @@ -55,6 +56,7 @@ public class DateTieredCompactor extends AbstractMultiOutputCompactor compact(final CompactionRequestImpl request, final List lowerBoundaries, + final Map lowerBoundariesPolicies, ThroughputController throughputController, User user) throws IOException { if (LOG.isDebugEnabled()) { LOG.debug("Executing compaction with " + lowerBoundaries.size() @@ -68,6 +70,7 @@ public class DateTieredCompactor extends AbstractMultiOutputCompactor candidates, long[] expectedFileSizes, long[] expectedBoundaries, boolean isMajor, boolean toCompact) throws IOException { + DateTieredCompactionRequest request = getRequest(now, candidates, isMajor, toCompact); + List actual = Lists.newArrayList(request.getFiles()); + assertEquals(Arrays.toString(expectedFileSizes), Arrays.toString(getSizes(actual))); + assertEquals(Arrays.toString(expectedBoundaries), + Arrays.toString(request.getBoundaries().toArray())); + } + + private DateTieredCompactionRequest getRequest(long now, ArrayList candidates, + boolean isMajor, boolean toCompact) throws IOException { ManualEnvironmentEdge timeMachine = new ManualEnvironmentEdge(); EnvironmentEdgeManager.injectEdge(timeMachine); timeMachine.setValue(now); DateTieredCompactionRequest request; DateTieredCompactionPolicy policy = - (DateTieredCompactionPolicy) store.storeEngine.getCompactionPolicy(); + (DateTieredCompactionPolicy) store.storeEngine.getCompactionPolicy(); if (isMajor) { for (HStoreFile file : candidates) { ((MockHStoreFile) file).setIsMajor(true); @@ -72,11 +82,18 @@ public class AbstractTestDateTieredCompactionPolicy extends TestCompactionPolicy } else { assertEquals(toCompact, policy.needsCompaction(candidates, ImmutableList.of())); request = - (DateTieredCompactionRequest) policy.selectMinorCompaction(candidates, false, false); + (DateTieredCompactionRequest) policy.selectMinorCompaction(candidates, false, false); + } + return request; + } + + protected void compactEqualsStoragePolicy(long now, ArrayList candidates, + Map expectedBoundariesPolicies, boolean isMajor, boolean toCompact) + throws IOException { + DateTieredCompactionRequest request = getRequest(now, candidates, isMajor, toCompact); + Map boundariesPolicies = request.getBoundariesPolicies(); + for (Map.Entry entry : expectedBoundariesPolicies.entrySet()) { + assertEquals(entry.getValue(), boundariesPolicies.get(entry.getKey())); } - List actual = Lists.newArrayList(request.getFiles()); - assertEquals(Arrays.toString(expectedFileSizes), Arrays.toString(getSizes(actual))); - assertEquals(Arrays.toString(expectedBoundaries), - Arrays.toString(request.getBoundaries().toArray())); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDateTieredCompactionPolicyHeterogeneousStorage.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDateTieredCompactionPolicyHeterogeneousStorage.java new file mode 100644 index 00000000000..74210e620fd --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDateTieredCompactionPolicyHeterogeneousStorage.java @@ -0,0 +1,189 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration; +import org.apache.hadoop.hbase.regionserver.compactions.ExponentialCompactionWindowFactory; +import org.apache.hadoop.hbase.testclassification.RegionServerTests; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({ RegionServerTests.class, SmallTests.class }) +public class TestDateTieredCompactionPolicyHeterogeneousStorage + extends AbstractTestDateTieredCompactionPolicy { + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestDateTieredCompactionPolicyHeterogeneousStorage.class); + public static final String HOT_WINDOW_SP = "ALL_SSD"; + public static final String WARM_WINDOW_SP = "ONE_SSD"; + public static final String COLD_WINDOW_SP = "HOT"; + + @Override + protected void config() { + super.config(); + + // Set up policy + conf.set(StoreEngine.STORE_ENGINE_CLASS_KEY, + "org.apache.hadoop.hbase.regionserver.DateTieredStoreEngine"); + conf.setLong(CompactionConfiguration.DATE_TIERED_MAX_AGE_MILLIS_KEY, 100); + conf.setLong(CompactionConfiguration.DATE_TIERED_INCOMING_WINDOW_MIN_KEY, 3); + conf.setLong(ExponentialCompactionWindowFactory.BASE_WINDOW_MILLIS_KEY, 6); + conf.setInt(ExponentialCompactionWindowFactory.WINDOWS_PER_TIER_KEY, 4); + conf.setBoolean(CompactionConfiguration.DATE_TIERED_SINGLE_OUTPUT_FOR_MINOR_COMPACTION_KEY, + false); + + // Special settings for compaction policy per window + this.conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MIN_KEY, 2); + this.conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MAX_KEY, 12); + this.conf.setFloat(CompactionConfiguration.HBASE_HSTORE_COMPACTION_RATIO_KEY, 1.2F); + + conf.setInt(HStore.BLOCKING_STOREFILES_KEY, 20); + conf.setLong(HConstants.MAJOR_COMPACTION_PERIOD, 5); + + // Set Storage Policy for different type window + conf.setBoolean(CompactionConfiguration.DATE_TIERED_STORAGE_POLICY_ENABLE_KEY, true); + conf.setLong(CompactionConfiguration.DATE_TIERED_HOT_WINDOW_AGE_MILLIS_KEY, 6); + conf.set(CompactionConfiguration.DATE_TIERED_HOT_WINDOW_STORAGE_POLICY_KEY, HOT_WINDOW_SP); + conf.setLong(CompactionConfiguration.DATE_TIERED_WARM_WINDOW_AGE_MILLIS_KEY, 12); + conf.set(CompactionConfiguration.DATE_TIERED_WARM_WINDOW_STORAGE_POLICY_KEY, WARM_WINDOW_SP); + conf.set(CompactionConfiguration.DATE_TIERED_COLD_WINDOW_STORAGE_POLICY_KEY, COLD_WINDOW_SP); + } + + /** + * Test for minor compaction of incoming window. + * Incoming window start ts >= now - hot age. So it is HOT window, will use HOT_WINDOW_SP. + * @throws IOException with error + */ + @Test + public void testIncomingWindowHot() throws IOException { + long[] minTimestamps = new long[] { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 }; + long[] maxTimestamps = new long[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15 }; + long[] sizes = new long[] { 30, 31, 32, 33, 34, 20, 21, 22, 23, 24, 25, 10, 11, 12, 13 }; + Map expected = new HashMap<>(); + // expected DateTieredCompactionRequest boundaries = { Long.MIN_VALUE, 12 } + // test whether DateTieredCompactionRequest boundariesPolicies matches expected + expected.put(12L, HOT_WINDOW_SP); + compactEqualsStoragePolicy(16, sfCreate(minTimestamps, maxTimestamps, sizes), + expected, false, true); + } + + /** + * Test for not incoming window. + * now - hot age > window start >= now - warm age, + * so this window and is WARM window, will use WARM_WINDOW_SP + * @throws IOException with error + */ + @Test + public void testNotIncomingWindowWarm() throws IOException { + long[] minTimestamps = new long[] { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 }; + long[] maxTimestamps = new long[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13 }; + long[] sizes = new long[] { 30, 31, 32, 33, 34, 20, 21, 22, 23, 24, 25, 10, 11 }; + Map expected = new HashMap<>(); + // expected DateTieredCompactionRequest boundaries = { Long.MIN_VALUE, 6 } + expected.put(6L, WARM_WINDOW_SP); + compactEqualsStoragePolicy(16, sfCreate(minTimestamps, maxTimestamps, sizes), + expected, false, true); + } + + /** + * Test for not incoming window. + * this window start ts >= ow - hot age, + * So this incoming window and is HOT window. Use HOT_WINDOW_SP + * @throws IOException with error + */ + @Test + public void testNotIncomingWindowAndIsHot() throws IOException { + long[] minTimestamps = new long[] { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 }; + long[] maxTimestamps = new long[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13 }; + long[] sizes = new long[] { 30, 31, 32, 33, 34, 20, 21, 22, 23, 24, 25, 10, 11 }; + Map expected = new HashMap<>(); + // expected DateTieredCompactionRequest boundaries = { Long.MIN_VALUE, 6 } + expected.put(6L, HOT_WINDOW_SP); + compactEqualsStoragePolicy(12, sfCreate(minTimestamps, maxTimestamps, sizes), + expected, false, true); + } + + /** + * Test for not incoming window. + * COLD window start timestamp < now - warm age, so use COLD_WINDOW_SP + * @throws IOException with error + */ + @Test + public void testColdWindow() throws IOException { + long[] minTimestamps = new long[] { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 }; + long[] maxTimestamps = new long[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12 }; + long[] sizes = new long[] { 30, 31, 32, 33, 34, 20, 21, 22, 23, 24, 25, 10 }; + Map expected = new HashMap<>(); + // expected DateTieredCompactionRequest boundaries = { Long.MIN_VALUE, 6 } + expected.put(6L, COLD_WINDOW_SP); + compactEqualsStoragePolicy(22, sfCreate(minTimestamps, maxTimestamps, sizes), + expected, false, true); + } + + /** + * Test for not incoming window. but not all hfiles will be selected to compact. + * Apply exploring logic on non-incoming window. More than one hfile left in this window. + * this means minor compact single out is true. boundaries only contains Long.MIN_VALUE + * @throws IOException with error + */ + @Test + public void testRatioT0() throws IOException { + long[] minTimestamps = new long[] { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 }; + long[] maxTimestamps = new long[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12 }; + long[] sizes = new long[] { 30, 31, 32, 33, 34, 20, 21, 22, 280, 23, 24, 1 }; + Map expected = new HashMap<>(); + // window start = 6, expected DateTieredCompactionRequest boundaries = { Long.MIN_VALUE } + expected.put(Long.MIN_VALUE, WARM_WINDOW_SP); + compactEqualsStoragePolicy(16, sfCreate(minTimestamps, maxTimestamps, sizes), + expected, false, true); + } + + /** + * Test for Major compaction. It will compact all files and create multi output files + * with different window storage policy. + * @throws IOException with error + */ + @Test + public void testMajorCompation() throws IOException { + long[] minTimestamps = new long[] { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 }; + long[] maxTimestamps = new long[] { 44, 60, 61, 96, 100, 104, 105, 106, 113, 145, 157 }; + long[] sizes = new long[] { 0, 50, 51, 40, 41, 42, 33, 30, 31, 2, 1 }; + Map expected = new HashMap<>(); + expected.put(Long.MIN_VALUE, COLD_WINDOW_SP); + expected.put(24L, COLD_WINDOW_SP); + expected.put(48L, COLD_WINDOW_SP); + expected.put(72L, COLD_WINDOW_SP); + expected.put(96L, COLD_WINDOW_SP); + expected.put(120L, COLD_WINDOW_SP); + expected.put(144L, COLD_WINDOW_SP); + expected.put(150L, WARM_WINDOW_SP); + expected.put(156L, HOT_WINDOW_SP); + compactEquals(161, sfCreate(minTimestamps, maxTimestamps, sizes), + new long[] { 0, 50, 51, 40, 41, 42, 33, 30, 31, 2, 1 }, + new long[] { Long.MIN_VALUE, 24, 48, 72, 96, 120, 144, 150, 156 }, true, true); + compactEqualsStoragePolicy(161, sfCreate(minTimestamps, maxTimestamps, sizes), + expected,true, true); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestDateTieredCompactor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestDateTieredCompactor.java index 2d49085c542..ef0e41e8d4b 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestDateTieredCompactor.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestDateTieredCompactor.java @@ -24,12 +24,14 @@ import static org.junit.Assert.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; +import java.util.HashMap; import java.util.List; import java.util.OptionalLong; import org.apache.hadoop.conf.Configuration; @@ -111,7 +113,7 @@ public class TestDateTieredCompactor { when(store.createWriterInTmp(anyLong(), any(), anyBoolean(), anyBoolean(), anyBoolean(), anyBoolean())).thenAnswer(writers); when(store.createWriterInTmp(anyLong(), any(), anyBoolean(), - anyBoolean(), anyBoolean(), anyBoolean(), anyLong())).thenAnswer(writers); + anyBoolean(), anyBoolean(), anyBoolean(), anyLong(), anyString())).thenAnswer(writers); when(store.getComparator()).thenReturn(CellComparatorImpl.COMPARATOR); OptionalLong maxSequenceId = StoreUtils.getMaxSequenceIdInList(storefiles); when(store.getMaxSequenceId()).thenReturn(maxSequenceId); @@ -140,7 +142,8 @@ public class TestDateTieredCompactor { HStoreFile sf2 = createDummyStoreFile(2L); DateTieredCompactor dtc = createCompactor(writers, input, Arrays.asList(sf1, sf2)); List paths = dtc.compact(new CompactionRequestImpl(Arrays.asList(sf1)), - boundaries.subList(0, boundaries.size() - 1), NoLimitThroughputController.INSTANCE, null); + boundaries.subList(0, boundaries.size() - 1), new HashMap(), + NoLimitThroughputController.INSTANCE, null); writers.verifyKvs(output, allFiles, boundaries); if (allFiles) { assertEquals(output.length, paths.size()); @@ -169,7 +172,7 @@ public class TestDateTieredCompactor { DateTieredCompactor dtc = createCompactor(writers, new KeyValue[0], new ArrayList<>(request.getFiles())); List paths = dtc.compact(request, Arrays.asList(Long.MIN_VALUE, Long.MAX_VALUE), - NoLimitThroughputController.INSTANCE, null); + new HashMap(), NoLimitThroughputController.INSTANCE, null); assertEquals(1, paths.size()); List dummyWriters = writers.getWriters(); assertEquals(1, dummyWriters.size()); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactionPolicy.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactionPolicy.java index e8d70d7eea8..79b0a835d51 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactionPolicy.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactionPolicy.java @@ -27,6 +27,7 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.argThat; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.ArgumentMatchers.isNull; @@ -800,7 +801,7 @@ public class TestStripeCompactionPolicy { anyBoolean(), anyBoolean(), anyBoolean())).thenAnswer(writers); when( store.createWriterInTmp(anyLong(), any(), anyBoolean(), - anyBoolean(), anyBoolean(), anyBoolean(), anyLong())).thenAnswer(writers); + anyBoolean(), anyBoolean(), anyBoolean(), anyLong(), anyString())).thenAnswer(writers); Configuration conf = HBaseConfiguration.create(); conf.setBoolean("hbase.regionserver.compaction.private.readers", usePrivateReaders); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactor.java index 36ac142e4a6..c1572654dcb 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactor.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactor.java @@ -23,6 +23,7 @@ import static org.junit.Assert.assertEquals; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -211,7 +212,7 @@ public class TestStripeCompactor { when(store.createWriterInTmp(anyLong(), any(), anyBoolean(), anyBoolean(), anyBoolean(), anyBoolean())).thenAnswer(writers); when(store.createWriterInTmp(anyLong(), any(), anyBoolean(), - anyBoolean(), anyBoolean(), anyBoolean(), anyLong())).thenAnswer(writers); + anyBoolean(), anyBoolean(), anyBoolean(), anyLong(), anyString())).thenAnswer(writers); when(store.getComparator()).thenReturn(CellComparatorImpl.COMPARATOR); return new StripeCompactor(conf, store) {