HBASE-17172 Optimize major mob compaction with _del files

Signed-off-by: tedyu <yuzhihong@gmail.com>
This commit is contained in:
Huaxiang Sun 2017-01-06 09:25:49 -08:00 committed by tedyu
parent 938aef772d
commit d7325185ad
3 changed files with 659 additions and 68 deletions

View File

@ -24,8 +24,11 @@ import java.util.Collections;
import java.util.List;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.mob.MobConstants;
import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
/**
@ -37,14 +40,14 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
@InterfaceAudience.Private
public class PartitionedMobCompactionRequest extends MobCompactionRequest {
protected Collection<FileStatus> delFiles;
protected List<CompactionDelPartition> delPartitions;
protected Collection<CompactionPartition> compactionPartitions;
public PartitionedMobCompactionRequest(Collection<CompactionPartition> compactionPartitions,
Collection<FileStatus> delFiles) {
List<CompactionDelPartition> delPartitions) {
this.selectionTime = EnvironmentEdgeManager.currentTime();
this.compactionPartitions = compactionPartitions;
this.delFiles = delFiles;
this.delPartitions = delPartitions;
}
/**
@ -59,8 +62,8 @@ public class PartitionedMobCompactionRequest extends MobCompactionRequest {
* Gets the del files.
* @return The del files.
*/
public Collection<FileStatus> getDelFiles() {
return this.delFiles;
public List<CompactionDelPartition> getDelPartitions() {
return this.delPartitions;
}
/**
@ -72,6 +75,10 @@ public class PartitionedMobCompactionRequest extends MobCompactionRequest {
private List<FileStatus> files = new ArrayList<FileStatus>();
private CompactionPartitionId partitionId;
// The startKey and endKey of this partition, both are inclusive.
private byte[] startKey;
private byte[] endKey;
public CompactionPartition(CompactionPartitionId partitionId) {
this.partitionId = partitionId;
}
@ -91,6 +98,35 @@ public class PartitionedMobCompactionRequest extends MobCompactionRequest {
public int getFileCount () {
return files.size();
}
public byte[] getStartKey() {
return startKey;
}
/**
* Set start key of this partition, only if the input startKey is less than
* the current start key.
*/
public void setStartKey(final byte[] startKey)
{
if ((this.startKey == null) || (Bytes.compareTo(startKey, this.startKey) < 0)) {
this.startKey = startKey;
}
}
public byte[] getEndKey() {
return endKey;
}
/**
* Set end key of this partition, only if the input endKey is greater than
* the current end key.
*/
public void setEndKey(final byte[] endKey) {
if ((this.endKey == null) || (Bytes.compareTo(endKey, this.endKey) > 0)) {
this.endKey = endKey;
}
}
}
/**
@ -183,4 +219,116 @@ public class PartitionedMobCompactionRequest extends MobCompactionRequest {
return new StringBuilder(startKey).append(date).toString();
}
}
/**
* The delete file partition in the mob compaction.
* The delete partition is defined as [startKey, endKey] pair.
* The mob delete files that have the same start key and end key belong to
* the same partition.
*/
protected static class CompactionDelPartition {
private List<Path> delFiles = new ArrayList<Path>();
private List<StoreFile> storeFiles = new ArrayList<>();
private CompactionDelPartitionId id;
public CompactionDelPartition(CompactionDelPartitionId id) {
this.id = id;
}
public CompactionDelPartitionId getId() {
return this.id;
}
void addDelFile(FileStatus file) {
delFiles.add(file.getPath());
}
public void addStoreFile(final StoreFile file) {
storeFiles.add(file);
}
public List<StoreFile> getStoreFiles() {
return storeFiles;
}
List<Path> listDelFiles() {
return Collections.unmodifiableList(delFiles);
}
void addDelFileList(final Collection<Path> list) {
delFiles.addAll(list);
}
int getDelFileCount () {
return delFiles.size();
}
void cleanDelFiles() {
delFiles.clear();
}
}
/**
* The delete partition id that consists of start key and end key
*/
public static class CompactionDelPartitionId implements Comparable<CompactionDelPartitionId> {
private byte[] startKey;
private byte[] endKey;
public CompactionDelPartitionId() {
}
public CompactionDelPartitionId(final byte[] startKey, final byte[] endKey) {
this.startKey = startKey;
this.endKey = endKey;
}
public byte[] getStartKey() {
return this.startKey;
}
public void setStartKey(final byte[] startKey) {
this.startKey = startKey;
}
public byte[] getEndKey() {
return this.endKey;
}
public void setEndKey(final byte[] endKey) {
this.endKey = endKey;
}
public int compareTo(CompactionDelPartitionId o) {
/*
* 1). Compare the start key, if the k1 < k2, then k1 is less
* 2). If start Key is same, check endKey, k1 < k2, k1 is less
* If both are same, then they are equal.
*/
int result = Bytes.compareTo(this.startKey, o.getStartKey());
if (result != 0) {
return result;
}
return Bytes.compareTo(this.endKey, o.getEndKey());
}
@Override
public int hashCode() {
int result = 17;
result = 31 * result + java.util.Arrays.hashCode(startKey);
result = 31 * result + java.util.Arrays.hashCode(endKey);
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (!(obj instanceof CompactionDelPartitionId)) {
return false;
}
CompactionDelPartitionId another = (CompactionDelPartitionId) obj;
return (this.compareTo(another) == 0);
}
}
}

View File

@ -24,16 +24,19 @@ import java.util.ArrayList;
import java.util.Calendar;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.NavigableMap;
import java.util.TreeMap;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@ -58,11 +61,15 @@ import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.io.HFileLink;
import org.apache.hadoop.hbase.io.crypto.Encryption;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.io.hfile.HFile.Reader;
import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
import org.apache.hadoop.hbase.mob.MobConstants;
import org.apache.hadoop.hbase.mob.MobFileName;
import org.apache.hadoop.hbase.mob.MobUtils;
import org.apache.hadoop.hbase.mob.compactions.MobCompactionRequest.CompactionType;
import org.apache.hadoop.hbase.mob.compactions.PartitionedMobCompactionRequest.CompactionDelPartition;
import org.apache.hadoop.hbase.mob.compactions.PartitionedMobCompactionRequest.CompactionDelPartitionId;
import org.apache.hadoop.hbase.mob.compactions.PartitionedMobCompactionRequest.CompactionPartition;
import org.apache.hadoop.hbase.mob.compactions.PartitionedMobCompactionRequest.CompactionPartitionId;
import org.apache.hadoop.hbase.regionserver.BloomType;
@ -132,6 +139,7 @@ public class PartitionedMobCompactor extends MobCompactor {
return null;
}
LOG.info("is allFiles: " + allFiles);
// find the files to compact.
PartitionedMobCompactionRequest request = select(files, allFiles);
// compact the files.
@ -148,11 +156,14 @@ public class PartitionedMobCompactor extends MobCompactor {
*/
protected PartitionedMobCompactionRequest select(List<FileStatus> candidates,
boolean allFiles) throws IOException {
final Collection<FileStatus> allDelFiles = new ArrayList<>();
final Map<CompactionPartitionId, CompactionPartition> filesToCompact = new HashMap<>();
final CompactionPartitionId id = new CompactionPartitionId();
final NavigableMap<CompactionDelPartitionId, CompactionDelPartition> delFilesToCompact = new TreeMap<>();
final CompactionDelPartitionId delId = new CompactionDelPartitionId();
final ArrayList<CompactionDelPartition> allDelPartitions = new ArrayList<>();
int selectedFileCount = 0;
int irrelevantFileCount = 0;
int totalDelFiles = 0;
MobCompactPartitionPolicy policy = column.getMobCompactPartitionPolicy();
Calendar calendar = Calendar.getInstance();
@ -167,6 +178,31 @@ public class PartitionedMobCompactor extends MobCompactor {
firstDayOfCurrentWeek = MobUtils.getFirstDayOfWeek(calendar, currentDate);
}
// We check if there is any del files so the logic can be optimized for the following processing
// First step is to check if there is any delete files. If there is any delete files,
// For each Partition, it needs to read its startKey and endKey from files.
// If there is no delete file, there is no need to read startKey and endKey from files, this
// is an optimization.
boolean withDelFiles = false;
for (FileStatus file : candidates) {
if (!file.isFile()) {
continue;
}
// group the del files and small files.
FileStatus linkedFile = file;
if (HFileLink.isHFileLink(file.getPath())) {
HFileLink link = HFileLink.buildFromHFileLinkPattern(conf, file.getPath());
linkedFile = getLinkedFileStatus(link);
if (linkedFile == null) {
continue;
}
}
if (StoreFileInfo.isDelFile(linkedFile.getPath())) {
withDelFiles = true;
break;
}
}
for (FileStatus file : candidates) {
if (!file.isFile()) {
irrelevantFileCount++;
@ -183,13 +219,32 @@ public class PartitionedMobCompactor extends MobCompactor {
continue;
}
}
if (StoreFileInfo.isDelFile(linkedFile.getPath())) {
allDelFiles.add(file);
if (withDelFiles && StoreFileInfo.isDelFile(linkedFile.getPath())) {
// File in the Del Partition List
// Get delId from the file
Reader reader = HFile.createReader(fs, linkedFile.getPath(), CacheConfig.DISABLED, conf);
try {
delId.setStartKey(reader.getFirstRowKey());
delId.setEndKey(reader.getLastRowKey());
} finally {
reader.close();
}
CompactionDelPartition delPartition = delFilesToCompact.get(delId);
if (delPartition == null) {
CompactionDelPartitionId newDelId =
new CompactionDelPartitionId(delId.getStartKey(), delId.getEndKey());
delPartition = new CompactionDelPartition(newDelId);
delFilesToCompact.put(newDelId, delPartition);
}
delPartition.addDelFile(file);
totalDelFiles ++;
} else {
String fileName = linkedFile.getPath().getName();
String date = MobFileName.getDateFromName(fileName);
boolean skipCompaction = MobUtils.fillPartitionId(id, firstDayOfCurrentMonth,
firstDayOfCurrentWeek, date, policy, calendar, mergeableSize);
boolean skipCompaction = MobUtils
.fillPartitionId(id, firstDayOfCurrentMonth, firstDayOfCurrentWeek, date, policy,
calendar, mergeableSize);
if (allFiles || (!skipCompaction && (linkedFile.getLen() < id.getThreshold()))) {
// add all files if allFiles is true,
// otherwise add the small files to the merge pool
@ -209,37 +264,51 @@ public class PartitionedMobCompactor extends MobCompactor {
compactionPartition.getPartitionId().updateLatestDate(date);
}
if (withDelFiles) {
// get startKey and endKey from the file and update partition
// TODO: is it possible to skip read of most hfiles?
Reader reader = HFile.createReader(fs, linkedFile.getPath(), CacheConfig.DISABLED, conf);
try {
compactionPartition.setStartKey(reader.getFirstRowKey());
compactionPartition.setEndKey(reader.getLastRowKey());
} finally {
reader.close();
}
}
selectedFileCount++;
}
}
}
/*
* If it is not a major mob compaction with del files, and the file number in Partition is 1,
* remove the partition from filesToCompact list to avoid re-compacting files which has been
* compacted with del files.
* Merge del files so there are only non-overlapped del file lists
*/
if (!allFiles && (allDelFiles.size() > 0)) {
Iterator<Map.Entry<CompactionPartitionId, CompactionPartition>> it =
filesToCompact.entrySet().iterator();
for(Map.Entry<CompactionDelPartitionId, CompactionDelPartition> entry : delFilesToCompact.entrySet()) {
if (allDelPartitions.size() > 0) {
// check if the current key range overlaps the previous one
CompactionDelPartition prev = allDelPartitions.get(allDelPartitions.size() - 1);
if (Bytes.compareTo(prev.getId().getEndKey(), entry.getKey().getStartKey()) >= 0) {
// merge them together
prev.getId().setEndKey(entry.getValue().getId().getEndKey());
prev.addDelFileList(entry.getValue().listDelFiles());
while(it.hasNext()) {
Map.Entry<CompactionPartitionId, CompactionPartition> entry = it.next();
if (entry.getValue().getFileCount() == 1) {
it.remove();
--selectedFileCount;
} else {
allDelPartitions.add(entry.getValue());
}
} else {
allDelPartitions.add(entry.getValue());
}
}
PartitionedMobCompactionRequest request = new PartitionedMobCompactionRequest(
filesToCompact.values(), allDelFiles);
if (candidates.size() == (allDelFiles.size() + selectedFileCount + irrelevantFileCount)) {
filesToCompact.values(), allDelPartitions);
if (candidates.size() == (totalDelFiles + selectedFileCount + irrelevantFileCount)) {
// all the files are selected
request.setCompactionType(CompactionType.ALL_FILES);
}
LOG.info("The compaction type is " + request.getCompactionType() + ", the request has "
+ allDelFiles.size() + " del files, " + selectedFileCount + " selected files, and "
+ totalDelFiles + " del files, " + selectedFileCount + " selected files, and "
+ irrelevantFileCount + " irrelevant files");
return request;
}
@ -257,51 +326,139 @@ public class PartitionedMobCompactor extends MobCompactor {
*/
protected List<Path> performCompaction(PartitionedMobCompactionRequest request)
throws IOException {
// merge the del files
List<Path> delFilePaths = new ArrayList<>();
for (FileStatus delFile : request.delFiles) {
delFilePaths.add(delFile.getPath());
// merge the del files, it is per del partition
for (CompactionDelPartition delPartition : request.getDelPartitions()) {
if (delPartition.getDelFileCount() <= 1) continue;
List<Path> newDelPaths = compactDelFiles(request, delPartition.listDelFiles());
delPartition.cleanDelFiles();
delPartition.addDelFileList(newDelPaths);
}
List<Path> newDelPaths = compactDelFiles(request, delFilePaths);
List<StoreFile> newDelFiles = new ArrayList<>();
List<Path> paths = null;
int totalDelFileCount = 0;
try {
for (Path newDelPath : newDelPaths) {
StoreFile sf = new StoreFile(fs, newDelPath, conf, compactionCacheConfig, BloomType.NONE);
// pre-create reader of a del file to avoid race condition when opening the reader in each
// partition.
sf.createReader();
newDelFiles.add(sf);
for (CompactionDelPartition delPartition : request.getDelPartitions()) {
for (Path newDelPath : delPartition.listDelFiles()) {
StoreFile sf = new StoreFile(fs, newDelPath, conf, compactionCacheConfig, BloomType.NONE);
// pre-create reader of a del file to avoid race condition when opening the reader in each
// partition.
sf.createReader();
delPartition.addStoreFile(sf);
totalDelFileCount++;
}
}
LOG.info("After merging, there are " + newDelFiles.size() + " del files");
LOG.info("After merging, there are " + totalDelFileCount + " del files");
// compact the mob files by partitions.
paths = compactMobFiles(request, newDelFiles);
paths = compactMobFiles(request);
LOG.info("After compaction, there are " + paths.size() + " mob files");
} finally {
closeStoreFileReaders(newDelFiles);
for (CompactionDelPartition delPartition : request.getDelPartitions()) {
closeStoreFileReaders(delPartition.getStoreFiles());
}
}
// archive the del files if all the mob files are selected.
if (request.type == CompactionType.ALL_FILES && !newDelPaths.isEmpty()) {
if (request.type == CompactionType.ALL_FILES && !request.getDelPartitions().isEmpty()) {
LOG.info(
"After a mob compaction with all files selected, archiving the del files " + newDelPaths);
try {
MobUtils.removeMobFiles(conf, fs, tableName, mobTableDir, column.getName(), newDelFiles);
} catch (IOException e) {
LOG.error("Failed to archive the del files " + newDelPaths, e);
"After a mob compaction with all files selected, archiving the del files ");
for (CompactionDelPartition delPartition : request.getDelPartitions()) {
LOG.info(delPartition.listDelFiles());
try {
MobUtils.removeMobFiles(conf, fs, tableName, mobTableDir, column.getName(), delPartition.getStoreFiles());
} catch (IOException e) {
LOG.error("Failed to archive the del files " + delPartition.getStoreFiles(), e);
}
}
}
return paths;
}
static class DelPartitionComparator implements Comparator<CompactionDelPartition> {
private boolean compareStartKey;
DelPartitionComparator(boolean compareStartKey) {
this.compareStartKey = compareStartKey;
}
public boolean getCompareStartKey() {
return this.compareStartKey;
}
public void setCompareStartKey(final boolean compareStartKey) {
this.compareStartKey = compareStartKey;
}
@Override
public int compare(CompactionDelPartition o1, CompactionDelPartition o2) {
if (compareStartKey) {
return Bytes.compareTo(o1.getId().getStartKey(), o2.getId().getStartKey());
} else {
return Bytes.compareTo(o1.getId().getEndKey(), o2.getId().getEndKey());
}
}
}
@VisibleForTesting
List<StoreFile> getListOfDelFilesForPartition(final CompactionPartition partition,
final List<CompactionDelPartition> delPartitions) {
// Binary search for startKey and endKey
List<StoreFile> result = new ArrayList<>();
DelPartitionComparator comparator = new DelPartitionComparator(false);
CompactionDelPartitionId id = new CompactionDelPartitionId(null, partition.getStartKey());
CompactionDelPartition target = new CompactionDelPartition(id);
int start = Collections.binarySearch(delPartitions, target, comparator);
// Get the start index for partition
if (start < 0) {
// Calculate the insert point
start = (start + 1) * (-1);
if (start == delPartitions.size()) {
// no overlap
return result;
} else {
// Check another case which has no overlap
if (Bytes.compareTo(partition.getEndKey(), delPartitions.get(start).getId().getStartKey()) < 0) {
return result;
}
}
}
// Search for end index for the partition
comparator.setCompareStartKey(true);
id.setStartKey(partition.getEndKey());
int end = Collections.binarySearch(delPartitions, target, comparator);
if (end < 0) {
end = (end + 1) * (-1);
if (end == 0) {
return result;
} else {
--end;
if (Bytes.compareTo(partition.getStartKey(), delPartitions.get(end).getId().getEndKey()) > 0) {
return result;
}
}
}
for (int i = start; i <= end; ++i) {
result.addAll(delPartitions.get(i).getStoreFiles());
}
return result;
}
/**
* Compacts the selected small mob files and all the del files.
* @param request The compaction request.
* @param delFiles The del files.
* @return The paths of new mob files after compactions.
* @throws IOException if IO failure is encountered
*/
protected List<Path> compactMobFiles(final PartitionedMobCompactionRequest request,
final List<StoreFile> delFiles) throws IOException {
protected List<Path> compactMobFiles(final PartitionedMobCompactionRequest request)
throws IOException {
Collection<CompactionPartition> partitions = request.compactionPartitions;
if (partitions == null || partitions.isEmpty()) {
LOG.info("No partitions of mob files");
@ -310,10 +467,19 @@ public class PartitionedMobCompactor extends MobCompactor {
List<Path> paths = new ArrayList<>();
final Connection c = ConnectionFactory.createConnection(conf);
final Table table = c.getTable(tableName);
try {
Map<CompactionPartitionId, Future<List<Path>>> results = new HashMap<>();
// compact the mob files by partitions in parallel.
for (final CompactionPartition partition : partitions) {
// How to efficiently come up a list of delFiles for one partition?
// Search the delPartitions and collect all the delFiles for the partition
// One optimization can do is that if there is no del file, we do not need to
// come up with startKey/endKey.
List<StoreFile> delFiles = getListOfDelFilesForPartition(partition,
request.getDelPartitions());
results.put(partition.getPartitionId(), pool.submit(new Callable<List<Path>>() {
@Override
public List<Path> call() throws Exception {

View File

@ -47,6 +47,7 @@ import org.apache.hadoop.hbase.mob.MobConstants;
import org.apache.hadoop.hbase.mob.MobFileName;
import org.apache.hadoop.hbase.mob.MobUtils;
import org.apache.hadoop.hbase.mob.compactions.MobCompactionRequest.CompactionType;
import org.apache.hadoop.hbase.mob.compactions.PartitionedMobCompactionRequest.CompactionDelPartition;
import org.apache.hadoop.hbase.mob.compactions.PartitionedMobCompactionRequest.CompactionPartition;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSUtils;
@ -68,12 +69,13 @@ public class TestPartitionedMobCompactor {
private final static String family = "family";
private final static String qf = "qf";
private final long DAY_IN_MS = 1000 * 60 * 60 * 24;
private static byte[] KEYS = Bytes.toBytes("012");
private HColumnDescriptor hcd = new HColumnDescriptor(family);
private Configuration conf = TEST_UTIL.getConfiguration();
private CacheConfig cacheConf = new CacheConfig(conf);
private FileSystem fs;
private List<FileStatus> mobFiles = new ArrayList<>();
private List<FileStatus> delFiles = new ArrayList<>();
private List<Path> delFiles = new ArrayList<>();
private List<FileStatus> allFiles = new ArrayList<>();
private Path basePath;
private String mobSuffix;
@ -106,6 +108,9 @@ public class TestPartitionedMobCompactor {
basePath = new Path(new Path(mobTestDir, tableName), family);
mobSuffix = UUID.randomUUID().toString().replaceAll("-", "");
delSuffix = UUID.randomUUID().toString().replaceAll("-", "") + "_del";
allFiles.clear();
mobFiles.clear();
delFiles.clear();
}
@Test
@ -220,15 +225,6 @@ public class TestPartitionedMobCompactor {
CompactionType.ALL_FILES, false, false);
}
@Test
public void testCompactionSelectToAvoidCompactOneFileWithDelete() throws Exception {
String tableName = "testCompactionSelectToAvoidCompactOneFileWithDelete";
// If there is only 1 file, it will not be compacted with _del files, so
// It wont be CompactionType.ALL_FILES in this case, and expected compact file count will be 0.
testCompactionAtMergeSize(tableName, MobConstants.DEFAULT_MOB_COMPACTION_MERGEABLE_THRESHOLD,
CompactionType.PART_FILES, false);
}
@Test
public void testCompactionSelectWithPartFiles() throws Exception {
String tableName = "testCompactionSelectWithPartFiles";
@ -383,6 +379,239 @@ public class TestPartitionedMobCompactor {
}
}
/**
* Create mulitple partition files
*/
private void createMobFile(Path basePath) throws IOException {
HFileContext meta = new HFileContextBuilder().withBlockSize(8 * 1024).build();
MobFileName mobFileName = null;
int ii = 0;
Date today = new Date();
for (byte k0 : KEYS) {
byte[] startRow = Bytes.toBytes(ii++);
mobFileName = MobFileName.create(startRow, MobUtils.formatDate(today), mobSuffix);
StoreFileWriter mobFileWriter =
new StoreFileWriter.Builder(conf, cacheConf, fs).withFileContext(meta)
.withFilePath(new Path(basePath, mobFileName.getFileName())).build();
long now = System.currentTimeMillis();
try {
for (int i = 0; i < 10; i++) {
byte[] key = Bytes.add(Bytes.toBytes(k0), Bytes.toBytes(i));
byte[] dummyData = new byte[5000];
new Random().nextBytes(dummyData);
mobFileWriter.append(
new KeyValue(key, Bytes.toBytes(family), Bytes.toBytes(qf), now, Type.Put, dummyData));
}
} finally {
mobFileWriter.close();
}
}
}
/**
* Create mulitple partition delete files
*/
private void createMobDelFile(Path basePath, int startKey) throws IOException {
HFileContext meta = new HFileContextBuilder().withBlockSize(8 * 1024).build();
MobFileName mobFileName = null;
Date today = new Date();
byte[] startRow = Bytes.toBytes(startKey);
mobFileName = MobFileName.create(startRow, MobUtils.formatDate(today), delSuffix);
StoreFileWriter mobFileWriter =
new StoreFileWriter.Builder(conf, cacheConf, fs).withFileContext(meta)
.withFilePath(new Path(basePath, mobFileName.getFileName())).build();
long now = System.currentTimeMillis();
try {
byte[] key = Bytes.add(Bytes.toBytes(KEYS[startKey]), Bytes.toBytes(0));
byte[] dummyData = new byte[5000];
new Random().nextBytes(dummyData);
mobFileWriter.append(
new KeyValue(key, Bytes.toBytes(family), Bytes.toBytes(qf), now, Type.Delete, dummyData));
key = Bytes.add(Bytes.toBytes(KEYS[startKey]), Bytes.toBytes(2));
mobFileWriter.append(
new KeyValue(key, Bytes.toBytes(family), Bytes.toBytes(qf), now, Type.Delete, dummyData));
key = Bytes.add(Bytes.toBytes(KEYS[startKey]), Bytes.toBytes(4));
mobFileWriter.append(
new KeyValue(key, Bytes.toBytes(family), Bytes.toBytes(qf), now, Type.Delete, dummyData));
} finally {
mobFileWriter.close();
}
}
@Test
public void testCompactFilesWithoutDelFile() throws Exception {
String tableName = "testCompactFilesWithoutDelFile";
resetConf();
init(tableName);
createMobFile(basePath);
listFiles();
PartitionedMobCompactor compactor = new PartitionedMobCompactor(conf, fs,
TableName.valueOf(tableName), hcd, pool) {
@Override
public List<Path> compact(List<FileStatus> files, boolean isForceAllFiles)
throws IOException {
if (files == null || files.isEmpty()) {
return null;
}
PartitionedMobCompactionRequest request = select(files, isForceAllFiles);
// Make sure that there is no del Partitions
Assert.assertTrue(request.getDelPartitions().size() == 0);
// Make sure that when there is no startKey/endKey for partition.
for (CompactionPartition p : request.getCompactionPartitions()) {
Assert.assertTrue(p.getStartKey() == null);
Assert.assertTrue(p.getEndKey() == null);
}
return null;
}
};
compactor.compact(allFiles, true);
}
static class MyPartitionedMobCompactor extends PartitionedMobCompactor {
int delPartitionSize = 0;
int PartitionsIncludeDelFiles = 0;
CacheConfig cacheConfig = null;
MyPartitionedMobCompactor(Configuration conf, FileSystem fs, TableName tableName,
HColumnDescriptor column, ExecutorService pool, final int delPartitionSize,
final CacheConfig cacheConf, final int PartitionsIncludeDelFiles)
throws IOException {
super(conf, fs, tableName, column, pool);
this.delPartitionSize = delPartitionSize;
this.cacheConfig = cacheConf;
this.PartitionsIncludeDelFiles = PartitionsIncludeDelFiles;
}
@Override public List<Path> compact(List<FileStatus> files, boolean isForceAllFiles)
throws IOException {
if (files == null || files.isEmpty()) {
return null;
}
PartitionedMobCompactionRequest request = select(files, isForceAllFiles);
Assert.assertTrue(request.getDelPartitions().size() == delPartitionSize);
if (request.getDelPartitions().size() > 0) {
for (CompactionPartition p : request.getCompactionPartitions()) {
Assert.assertTrue(p.getStartKey() != null);
Assert.assertTrue(p.getEndKey() != null);
}
}
try {
for (CompactionDelPartition delPartition : request.getDelPartitions()) {
for (Path newDelPath : delPartition.listDelFiles()) {
StoreFile sf = new StoreFile(fs, newDelPath, conf, this.cacheConfig, BloomType.NONE);
// pre-create reader of a del file to avoid race condition when opening the reader in each
// partition.
sf.createReader();
delPartition.addStoreFile(sf);
}
}
// Make sure that CompactionDelPartitions does not overlap
CompactionDelPartition prevDelP = null;
for (CompactionDelPartition delP : request.getDelPartitions()) {
Assert.assertTrue(
Bytes.compareTo(delP.getId().getStartKey(), delP.getId().getEndKey()) <= 0);
if (prevDelP != null) {
Assert.assertTrue(
Bytes.compareTo(prevDelP.getId().getEndKey(), delP.getId().getStartKey()) < 0);
}
}
int affectedPartitions = 0;
// Make sure that only del files within key range for a partition is included in compaction.
// compact the mob files by partitions in parallel.
for (CompactionPartition partition : request.getCompactionPartitions()) {
List<StoreFile> delFiles = getListOfDelFilesForPartition(partition, request.getDelPartitions());
if (!request.getDelPartitions().isEmpty()) {
if (!((Bytes.compareTo(request.getDelPartitions().get(0).getId().getStartKey(),
partition.getEndKey()) > 0) || (Bytes.compareTo(
request.getDelPartitions().get(request.getDelPartitions().size() - 1).getId()
.getEndKey(), partition.getStartKey()) < 0))) {
if (delFiles.size() > 0) {
Assert.assertTrue(delFiles.size() == 1);
affectedPartitions += delFiles.size();
Assert.assertTrue(Bytes.compareTo(partition.getStartKey(),
CellUtil.cloneRow(delFiles.get(0).getLastKey())) <= 0);
Assert.assertTrue(Bytes.compareTo(partition.getEndKey(),
CellUtil.cloneRow(delFiles.get(delFiles.size() - 1).getFirstKey())) >= 0);
}
}
}
}
// The del file is only included in one partition
Assert.assertTrue(affectedPartitions == PartitionsIncludeDelFiles);
} finally {
for (CompactionDelPartition delPartition : request.getDelPartitions()) {
for (StoreFile storeFile : delPartition.getStoreFiles()) {
try {
storeFile.closeReader(true);
} catch (IOException e) {
LOG.warn("Failed to close the reader on store file " + storeFile.getPath(), e);
}
}
}
}
return null;
}
}
@Test
public void testCompactFilesWithOneDelFile() throws Exception {
String tableName = "testCompactFilesWithOneDelFile";
resetConf();
init(tableName);
// Create only del file.
createMobFile(basePath);
createMobDelFile(basePath, 2);
listFiles();
MyPartitionedMobCompactor compactor = new MyPartitionedMobCompactor(conf, fs,
TableName.valueOf(tableName), hcd, pool, 1, cacheConf, 1);
compactor.compact(allFiles, true);
}
@Test
public void testCompactFilesWithMultiDelFiles() throws Exception {
String tableName = "testCompactFilesWithMultiDelFiles";
resetConf();
init(tableName);
// Create only del file.
createMobFile(basePath);
createMobDelFile(basePath, 0);
createMobDelFile(basePath, 1);
createMobDelFile(basePath, 2);
listFiles();
MyPartitionedMobCompactor compactor = new MyPartitionedMobCompactor(conf, fs,
TableName.valueOf(tableName), hcd, pool, 3, cacheConf, 3);
compactor.compact(allFiles, true);
}
private void testCompactDelFilesAtBatchSize(String tableName, int batchSize,
int delfileMaxCount) throws Exception {
@ -419,12 +648,53 @@ public class TestPartitionedMobCompactor {
return null;
}
PartitionedMobCompactionRequest request = select(files, isForceAllFiles);
// Make sure that when there is no del files, there will be no startKey/endKey for partition.
if (request.getDelPartitions().size() == 0) {
for (CompactionPartition p : request.getCompactionPartitions()) {
Assert.assertTrue(p.getStartKey() == null);
Assert.assertTrue(p.getEndKey() == null);
}
}
// Make sure that CompactionDelPartitions does not overlap
CompactionDelPartition prevDelP = null;
for (CompactionDelPartition delP : request.getDelPartitions()) {
Assert.assertTrue(Bytes.compareTo(delP.getId().getStartKey(),
delP.getId().getEndKey()) <= 0);
if (prevDelP != null) {
Assert.assertTrue(Bytes.compareTo(prevDelP.getId().getEndKey(),
delP.getId().getStartKey()) < 0);
}
}
// Make sure that only del files within key range for a partition is included in compaction.
// compact the mob files by partitions in parallel.
for (CompactionPartition partition : request.getCompactionPartitions()) {
List<StoreFile> delFiles = getListOfDelFilesForPartition(partition, request.getDelPartitions());
if (!request.getDelPartitions().isEmpty()) {
if (!((Bytes.compareTo(request.getDelPartitions().get(0).getId().getStartKey(),
partition.getEndKey()) > 0) || (Bytes.compareTo(
request.getDelPartitions().get(request.getDelPartitions().size() - 1).getId()
.getEndKey(), partition.getStartKey()) < 0))) {
if (delFiles.size() > 0) {
Assert.assertTrue(Bytes
.compareTo(partition.getStartKey(), delFiles.get(0).getFirstKey().getRowArray())
>= 0);
Assert.assertTrue(Bytes.compareTo(partition.getEndKey(),
delFiles.get(delFiles.size() - 1).getLastKey().getRowArray()) <= 0);
}
}
}
}
// assert the compaction type
Assert.assertEquals(type, request.type);
// assert get the right partitions
compareCompactedPartitions(expected, request.compactionPartitions);
// assert get the right del files
compareDelFiles(request.delFiles);
compareDelFiles(request.getDelPartitions());
return null;
}
};
@ -446,8 +716,10 @@ public class TestPartitionedMobCompactor {
protected List<Path> performCompaction(PartitionedMobCompactionRequest request)
throws IOException {
List<Path> delFilePaths = new ArrayList<Path>();
for (FileStatus delFile : request.delFiles) {
delFilePaths.add(delFile.getPath());
for (CompactionDelPartition delPartition: request.getDelPartitions()) {
for (Path p : delPartition.listDelFiles()) {
delFilePaths.add(p);
}
}
List<Path> newDelPaths = compactDelFiles(request, delFilePaths);
// assert the del files are merged.
@ -466,7 +738,7 @@ public class TestPartitionedMobCompactor {
for (FileStatus file : fs.listStatus(basePath)) {
allFiles.add(file);
if (file.getPath().getName().endsWith("_del")) {
delFiles.add(file);
delFiles.add(file.getPath());
} else {
mobFiles.add(file);
}
@ -493,13 +765,18 @@ public class TestPartitionedMobCompactor {
/**
* Compares the del files.
* @param allDelFiles all the del files
* @param delPartitions all del partitions
*/
private void compareDelFiles(Collection<FileStatus> allDelFiles) {
private void compareDelFiles(List<CompactionDelPartition> delPartitions) {
int i = 0;
for (FileStatus file : allDelFiles) {
Assert.assertEquals(delFiles.get(i), file);
i++;
Map<Path, Path> delMap = new HashMap<>();
for (CompactionDelPartition delPartition : delPartitions) {
for (Path f : delPartition.listDelFiles()) {
delMap.put(f, f);
}
}
for (Path f : delFiles) {
Assert.assertTrue(delMap.containsKey(f));
}
}