diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/PartitionedMobCompactionRequest.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/PartitionedMobCompactionRequest.java index 33351491a2c..3292d997aeb 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/PartitionedMobCompactionRequest.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/PartitionedMobCompactionRequest.java @@ -24,8 +24,11 @@ import java.util.Collections; import java.util.List; import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.mob.MobConstants; +import org.apache.hadoop.hbase.regionserver.StoreFile; +import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; /** @@ -37,14 +40,14 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; @InterfaceAudience.Private public class PartitionedMobCompactionRequest extends MobCompactionRequest { - protected Collection delFiles; + protected List delPartitions; protected Collection compactionPartitions; public PartitionedMobCompactionRequest(Collection compactionPartitions, - Collection delFiles) { + List delPartitions) { this.selectionTime = EnvironmentEdgeManager.currentTime(); this.compactionPartitions = compactionPartitions; - this.delFiles = delFiles; + this.delPartitions = delPartitions; } /** @@ -59,8 +62,8 @@ public class PartitionedMobCompactionRequest extends MobCompactionRequest { * Gets the del files. * @return The del files. */ - public Collection getDelFiles() { - return this.delFiles; + public List getDelPartitions() { + return this.delPartitions; } /** @@ -72,6 +75,10 @@ public class PartitionedMobCompactionRequest extends MobCompactionRequest { private List files = new ArrayList(); private CompactionPartitionId partitionId; + // The startKey and endKey of this partition, both are inclusive. + private byte[] startKey; + private byte[] endKey; + public CompactionPartition(CompactionPartitionId partitionId) { this.partitionId = partitionId; } @@ -91,6 +98,35 @@ public class PartitionedMobCompactionRequest extends MobCompactionRequest { public int getFileCount () { return files.size(); } + + public byte[] getStartKey() { + return startKey; + } + + /** + * Set start key of this partition, only if the input startKey is less than + * the current start key. + */ + public void setStartKey(final byte[] startKey) + { + if ((this.startKey == null) || (Bytes.compareTo(startKey, this.startKey) < 0)) { + this.startKey = startKey; + } + } + + public byte[] getEndKey() { + return endKey; + } + + /** + * Set end key of this partition, only if the input endKey is greater than + * the current end key. + */ + public void setEndKey(final byte[] endKey) { + if ((this.endKey == null) || (Bytes.compareTo(endKey, this.endKey) > 0)) { + this.endKey = endKey; + } + } } /** @@ -183,4 +219,116 @@ public class PartitionedMobCompactionRequest extends MobCompactionRequest { return new StringBuilder(startKey).append(date).toString(); } } + + /** + * The delete file partition in the mob compaction. + * The delete partition is defined as [startKey, endKey] pair. + * The mob delete files that have the same start key and end key belong to + * the same partition. + */ + protected static class CompactionDelPartition { + private List delFiles = new ArrayList(); + private List storeFiles = new ArrayList<>(); + private CompactionDelPartitionId id; + + public CompactionDelPartition(CompactionDelPartitionId id) { + this.id = id; + } + + public CompactionDelPartitionId getId() { + return this.id; + } + + void addDelFile(FileStatus file) { + delFiles.add(file.getPath()); + } + public void addStoreFile(final StoreFile file) { + storeFiles.add(file); + } + + public List getStoreFiles() { + return storeFiles; + } + + List listDelFiles() { + return Collections.unmodifiableList(delFiles); + } + + void addDelFileList(final Collection list) { + delFiles.addAll(list); + } + + int getDelFileCount () { + return delFiles.size(); + } + + void cleanDelFiles() { + delFiles.clear(); + } + } + + /** + * The delete partition id that consists of start key and end key + */ + public static class CompactionDelPartitionId implements Comparable { + private byte[] startKey; + private byte[] endKey; + + public CompactionDelPartitionId() { + } + + public CompactionDelPartitionId(final byte[] startKey, final byte[] endKey) { + this.startKey = startKey; + this.endKey = endKey; + } + + public byte[] getStartKey() { + return this.startKey; + } + public void setStartKey(final byte[] startKey) { + this.startKey = startKey; + } + + public byte[] getEndKey() { + return this.endKey; + } + public void setEndKey(final byte[] endKey) { + this.endKey = endKey; + } + + public int compareTo(CompactionDelPartitionId o) { + /* + * 1). Compare the start key, if the k1 < k2, then k1 is less + * 2). If start Key is same, check endKey, k1 < k2, k1 is less + * If both are same, then they are equal. + */ + int result = Bytes.compareTo(this.startKey, o.getStartKey()); + if (result != 0) { + return result; + } + + return Bytes.compareTo(this.endKey, o.getEndKey()); + } + + @Override + public int hashCode() { + int result = 17; + result = 31 * result + java.util.Arrays.hashCode(startKey); + result = 31 * result + java.util.Arrays.hashCode(endKey); + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (!(obj instanceof CompactionDelPartitionId)) { + return false; + } + CompactionDelPartitionId another = (CompactionDelPartitionId) obj; + + return (this.compareTo(another) == 0); + } + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/PartitionedMobCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/PartitionedMobCompactor.java index a0823d747fa..b6eb6409acf 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/PartitionedMobCompactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/PartitionedMobCompactor.java @@ -24,16 +24,19 @@ import java.util.ArrayList; import java.util.Calendar; import java.util.Collection; import java.util.Collections; +import java.util.Comparator; import java.util.Date; import java.util.HashMap; -import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.NavigableMap; +import java.util.TreeMap; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; +import com.google.common.annotations.VisibleForTesting; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -58,11 +61,15 @@ import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.io.HFileLink; import org.apache.hadoop.hbase.io.crypto.Encryption; import org.apache.hadoop.hbase.io.hfile.CacheConfig; +import org.apache.hadoop.hbase.io.hfile.HFile; +import org.apache.hadoop.hbase.io.hfile.HFile.Reader; import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles; import org.apache.hadoop.hbase.mob.MobConstants; import org.apache.hadoop.hbase.mob.MobFileName; import org.apache.hadoop.hbase.mob.MobUtils; import org.apache.hadoop.hbase.mob.compactions.MobCompactionRequest.CompactionType; +import org.apache.hadoop.hbase.mob.compactions.PartitionedMobCompactionRequest.CompactionDelPartition; +import org.apache.hadoop.hbase.mob.compactions.PartitionedMobCompactionRequest.CompactionDelPartitionId; import org.apache.hadoop.hbase.mob.compactions.PartitionedMobCompactionRequest.CompactionPartition; import org.apache.hadoop.hbase.mob.compactions.PartitionedMobCompactionRequest.CompactionPartitionId; import org.apache.hadoop.hbase.regionserver.BloomType; @@ -132,6 +139,7 @@ public class PartitionedMobCompactor extends MobCompactor { return null; } LOG.info("is allFiles: " + allFiles); + // find the files to compact. PartitionedMobCompactionRequest request = select(files, allFiles); // compact the files. @@ -148,11 +156,14 @@ public class PartitionedMobCompactor extends MobCompactor { */ protected PartitionedMobCompactionRequest select(List candidates, boolean allFiles) throws IOException { - final Collection allDelFiles = new ArrayList<>(); final Map filesToCompact = new HashMap<>(); final CompactionPartitionId id = new CompactionPartitionId(); + final NavigableMap delFilesToCompact = new TreeMap<>(); + final CompactionDelPartitionId delId = new CompactionDelPartitionId(); + final ArrayList allDelPartitions = new ArrayList<>(); int selectedFileCount = 0; int irrelevantFileCount = 0; + int totalDelFiles = 0; MobCompactPartitionPolicy policy = column.getMobCompactPartitionPolicy(); Calendar calendar = Calendar.getInstance(); @@ -167,6 +178,31 @@ public class PartitionedMobCompactor extends MobCompactor { firstDayOfCurrentWeek = MobUtils.getFirstDayOfWeek(calendar, currentDate); } + // We check if there is any del files so the logic can be optimized for the following processing + // First step is to check if there is any delete files. If there is any delete files, + // For each Partition, it needs to read its startKey and endKey from files. + // If there is no delete file, there is no need to read startKey and endKey from files, this + // is an optimization. + boolean withDelFiles = false; + for (FileStatus file : candidates) { + if (!file.isFile()) { + continue; + } + // group the del files and small files. + FileStatus linkedFile = file; + if (HFileLink.isHFileLink(file.getPath())) { + HFileLink link = HFileLink.buildFromHFileLinkPattern(conf, file.getPath()); + linkedFile = getLinkedFileStatus(link); + if (linkedFile == null) { + continue; + } + } + if (StoreFileInfo.isDelFile(linkedFile.getPath())) { + withDelFiles = true; + break; + } + } + for (FileStatus file : candidates) { if (!file.isFile()) { irrelevantFileCount++; @@ -183,13 +219,32 @@ public class PartitionedMobCompactor extends MobCompactor { continue; } } - if (StoreFileInfo.isDelFile(linkedFile.getPath())) { - allDelFiles.add(file); + if (withDelFiles && StoreFileInfo.isDelFile(linkedFile.getPath())) { + // File in the Del Partition List + + // Get delId from the file + Reader reader = HFile.createReader(fs, linkedFile.getPath(), CacheConfig.DISABLED, conf); + try { + delId.setStartKey(reader.getFirstRowKey()); + delId.setEndKey(reader.getLastRowKey()); + } finally { + reader.close(); + } + CompactionDelPartition delPartition = delFilesToCompact.get(delId); + if (delPartition == null) { + CompactionDelPartitionId newDelId = + new CompactionDelPartitionId(delId.getStartKey(), delId.getEndKey()); + delPartition = new CompactionDelPartition(newDelId); + delFilesToCompact.put(newDelId, delPartition); + } + delPartition.addDelFile(file); + totalDelFiles ++; } else { String fileName = linkedFile.getPath().getName(); String date = MobFileName.getDateFromName(fileName); - boolean skipCompaction = MobUtils.fillPartitionId(id, firstDayOfCurrentMonth, - firstDayOfCurrentWeek, date, policy, calendar, mergeableSize); + boolean skipCompaction = MobUtils + .fillPartitionId(id, firstDayOfCurrentMonth, firstDayOfCurrentWeek, date, policy, + calendar, mergeableSize); if (allFiles || (!skipCompaction && (linkedFile.getLen() < id.getThreshold()))) { // add all files if allFiles is true, // otherwise add the small files to the merge pool @@ -209,37 +264,51 @@ public class PartitionedMobCompactor extends MobCompactor { compactionPartition.getPartitionId().updateLatestDate(date); } + if (withDelFiles) { + // get startKey and endKey from the file and update partition + // TODO: is it possible to skip read of most hfiles? + Reader reader = HFile.createReader(fs, linkedFile.getPath(), CacheConfig.DISABLED, conf); + try { + compactionPartition.setStartKey(reader.getFirstRowKey()); + compactionPartition.setEndKey(reader.getLastRowKey()); + } finally { + reader.close(); + } + } + selectedFileCount++; } } } /* - * If it is not a major mob compaction with del files, and the file number in Partition is 1, - * remove the partition from filesToCompact list to avoid re-compacting files which has been - * compacted with del files. + * Merge del files so there are only non-overlapped del file lists */ - if (!allFiles && (allDelFiles.size() > 0)) { - Iterator> it = - filesToCompact.entrySet().iterator(); + for(Map.Entry entry : delFilesToCompact.entrySet()) { + if (allDelPartitions.size() > 0) { + // check if the current key range overlaps the previous one + CompactionDelPartition prev = allDelPartitions.get(allDelPartitions.size() - 1); + if (Bytes.compareTo(prev.getId().getEndKey(), entry.getKey().getStartKey()) >= 0) { + // merge them together + prev.getId().setEndKey(entry.getValue().getId().getEndKey()); + prev.addDelFileList(entry.getValue().listDelFiles()); - while(it.hasNext()) { - Map.Entry entry = it.next(); - if (entry.getValue().getFileCount() == 1) { - it.remove(); - --selectedFileCount; + } else { + allDelPartitions.add(entry.getValue()); } + } else { + allDelPartitions.add(entry.getValue()); } } PartitionedMobCompactionRequest request = new PartitionedMobCompactionRequest( - filesToCompact.values(), allDelFiles); - if (candidates.size() == (allDelFiles.size() + selectedFileCount + irrelevantFileCount)) { + filesToCompact.values(), allDelPartitions); + if (candidates.size() == (totalDelFiles + selectedFileCount + irrelevantFileCount)) { // all the files are selected request.setCompactionType(CompactionType.ALL_FILES); } LOG.info("The compaction type is " + request.getCompactionType() + ", the request has " - + allDelFiles.size() + " del files, " + selectedFileCount + " selected files, and " + + totalDelFiles + " del files, " + selectedFileCount + " selected files, and " + irrelevantFileCount + " irrelevant files"); return request; } @@ -257,51 +326,139 @@ public class PartitionedMobCompactor extends MobCompactor { */ protected List performCompaction(PartitionedMobCompactionRequest request) throws IOException { - // merge the del files - List delFilePaths = new ArrayList<>(); - for (FileStatus delFile : request.delFiles) { - delFilePaths.add(delFile.getPath()); + + // merge the del files, it is per del partition + for (CompactionDelPartition delPartition : request.getDelPartitions()) { + if (delPartition.getDelFileCount() <= 1) continue; + List newDelPaths = compactDelFiles(request, delPartition.listDelFiles()); + delPartition.cleanDelFiles(); + delPartition.addDelFileList(newDelPaths); } - List newDelPaths = compactDelFiles(request, delFilePaths); - List newDelFiles = new ArrayList<>(); + List paths = null; + int totalDelFileCount = 0; try { - for (Path newDelPath : newDelPaths) { - StoreFile sf = new StoreFile(fs, newDelPath, conf, compactionCacheConfig, BloomType.NONE); - // pre-create reader of a del file to avoid race condition when opening the reader in each - // partition. - sf.createReader(); - newDelFiles.add(sf); + for (CompactionDelPartition delPartition : request.getDelPartitions()) { + for (Path newDelPath : delPartition.listDelFiles()) { + StoreFile sf = new StoreFile(fs, newDelPath, conf, compactionCacheConfig, BloomType.NONE); + // pre-create reader of a del file to avoid race condition when opening the reader in each + // partition. + sf.createReader(); + delPartition.addStoreFile(sf); + totalDelFileCount++; + } } - LOG.info("After merging, there are " + newDelFiles.size() + " del files"); + LOG.info("After merging, there are " + totalDelFileCount + " del files"); // compact the mob files by partitions. - paths = compactMobFiles(request, newDelFiles); + paths = compactMobFiles(request); LOG.info("After compaction, there are " + paths.size() + " mob files"); } finally { - closeStoreFileReaders(newDelFiles); + for (CompactionDelPartition delPartition : request.getDelPartitions()) { + closeStoreFileReaders(delPartition.getStoreFiles()); + } } + // archive the del files if all the mob files are selected. - if (request.type == CompactionType.ALL_FILES && !newDelPaths.isEmpty()) { + if (request.type == CompactionType.ALL_FILES && !request.getDelPartitions().isEmpty()) { LOG.info( - "After a mob compaction with all files selected, archiving the del files " + newDelPaths); - try { - MobUtils.removeMobFiles(conf, fs, tableName, mobTableDir, column.getName(), newDelFiles); - } catch (IOException e) { - LOG.error("Failed to archive the del files " + newDelPaths, e); + "After a mob compaction with all files selected, archiving the del files "); + for (CompactionDelPartition delPartition : request.getDelPartitions()) { + LOG.info(delPartition.listDelFiles()); + try { + MobUtils.removeMobFiles(conf, fs, tableName, mobTableDir, column.getName(), delPartition.getStoreFiles()); + } catch (IOException e) { + LOG.error("Failed to archive the del files " + delPartition.getStoreFiles(), e); + } } } return paths; } + static class DelPartitionComparator implements Comparator { + private boolean compareStartKey; + + DelPartitionComparator(boolean compareStartKey) { + this.compareStartKey = compareStartKey; + } + + public boolean getCompareStartKey() { + return this.compareStartKey; + } + + public void setCompareStartKey(final boolean compareStartKey) { + this.compareStartKey = compareStartKey; + } + + @Override + public int compare(CompactionDelPartition o1, CompactionDelPartition o2) { + + if (compareStartKey) { + return Bytes.compareTo(o1.getId().getStartKey(), o2.getId().getStartKey()); + } else { + return Bytes.compareTo(o1.getId().getEndKey(), o2.getId().getEndKey()); + } + } + } + + @VisibleForTesting + List getListOfDelFilesForPartition(final CompactionPartition partition, + final List delPartitions) { + // Binary search for startKey and endKey + + List result = new ArrayList<>(); + + DelPartitionComparator comparator = new DelPartitionComparator(false); + CompactionDelPartitionId id = new CompactionDelPartitionId(null, partition.getStartKey()); + CompactionDelPartition target = new CompactionDelPartition(id); + int start = Collections.binarySearch(delPartitions, target, comparator); + + // Get the start index for partition + if (start < 0) { + // Calculate the insert point + start = (start + 1) * (-1); + if (start == delPartitions.size()) { + // no overlap + return result; + } else { + // Check another case which has no overlap + if (Bytes.compareTo(partition.getEndKey(), delPartitions.get(start).getId().getStartKey()) < 0) { + return result; + } + } + } + + // Search for end index for the partition + comparator.setCompareStartKey(true); + id.setStartKey(partition.getEndKey()); + int end = Collections.binarySearch(delPartitions, target, comparator); + + if (end < 0) { + end = (end + 1) * (-1); + if (end == 0) { + return result; + } else { + --end; + if (Bytes.compareTo(partition.getStartKey(), delPartitions.get(end).getId().getEndKey()) > 0) { + return result; + } + } + } + + for (int i = start; i <= end; ++i) { + result.addAll(delPartitions.get(i).getStoreFiles()); + } + + return result; + } + /** * Compacts the selected small mob files and all the del files. * @param request The compaction request. - * @param delFiles The del files. * @return The paths of new mob files after compactions. * @throws IOException if IO failure is encountered */ - protected List compactMobFiles(final PartitionedMobCompactionRequest request, - final List delFiles) throws IOException { + protected List compactMobFiles(final PartitionedMobCompactionRequest request) + throws IOException { Collection partitions = request.compactionPartitions; if (partitions == null || partitions.isEmpty()) { LOG.info("No partitions of mob files"); @@ -310,10 +467,19 @@ public class PartitionedMobCompactor extends MobCompactor { List paths = new ArrayList<>(); final Connection c = ConnectionFactory.createConnection(conf); final Table table = c.getTable(tableName); + try { Map>> results = new HashMap<>(); // compact the mob files by partitions in parallel. for (final CompactionPartition partition : partitions) { + + // How to efficiently come up a list of delFiles for one partition? + // Search the delPartitions and collect all the delFiles for the partition + // One optimization can do is that if there is no del file, we do not need to + // come up with startKey/endKey. + List delFiles = getListOfDelFilesForPartition(partition, + request.getDelPartitions()); + results.put(partition.getPartitionId(), pool.submit(new Callable>() { @Override public List call() throws Exception { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/compactions/TestPartitionedMobCompactor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/compactions/TestPartitionedMobCompactor.java index c34f5580576..3aaf0e4c102 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/compactions/TestPartitionedMobCompactor.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/compactions/TestPartitionedMobCompactor.java @@ -47,6 +47,7 @@ import org.apache.hadoop.hbase.mob.MobConstants; import org.apache.hadoop.hbase.mob.MobFileName; import org.apache.hadoop.hbase.mob.MobUtils; import org.apache.hadoop.hbase.mob.compactions.MobCompactionRequest.CompactionType; +import org.apache.hadoop.hbase.mob.compactions.PartitionedMobCompactionRequest.CompactionDelPartition; import org.apache.hadoop.hbase.mob.compactions.PartitionedMobCompactionRequest.CompactionPartition; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.FSUtils; @@ -68,12 +69,13 @@ public class TestPartitionedMobCompactor { private final static String family = "family"; private final static String qf = "qf"; private final long DAY_IN_MS = 1000 * 60 * 60 * 24; + private static byte[] KEYS = Bytes.toBytes("012"); private HColumnDescriptor hcd = new HColumnDescriptor(family); private Configuration conf = TEST_UTIL.getConfiguration(); private CacheConfig cacheConf = new CacheConfig(conf); private FileSystem fs; private List mobFiles = new ArrayList<>(); - private List delFiles = new ArrayList<>(); + private List delFiles = new ArrayList<>(); private List allFiles = new ArrayList<>(); private Path basePath; private String mobSuffix; @@ -106,6 +108,9 @@ public class TestPartitionedMobCompactor { basePath = new Path(new Path(mobTestDir, tableName), family); mobSuffix = UUID.randomUUID().toString().replaceAll("-", ""); delSuffix = UUID.randomUUID().toString().replaceAll("-", "") + "_del"; + allFiles.clear(); + mobFiles.clear(); + delFiles.clear(); } @Test @@ -220,15 +225,6 @@ public class TestPartitionedMobCompactor { CompactionType.ALL_FILES, false, false); } - @Test - public void testCompactionSelectToAvoidCompactOneFileWithDelete() throws Exception { - String tableName = "testCompactionSelectToAvoidCompactOneFileWithDelete"; - // If there is only 1 file, it will not be compacted with _del files, so - // It wont be CompactionType.ALL_FILES in this case, and expected compact file count will be 0. - testCompactionAtMergeSize(tableName, MobConstants.DEFAULT_MOB_COMPACTION_MERGEABLE_THRESHOLD, - CompactionType.PART_FILES, false); - } - @Test public void testCompactionSelectWithPartFiles() throws Exception { String tableName = "testCompactionSelectWithPartFiles"; @@ -383,6 +379,239 @@ public class TestPartitionedMobCompactor { } } + /** + * Create mulitple partition files + */ + private void createMobFile(Path basePath) throws IOException { + HFileContext meta = new HFileContextBuilder().withBlockSize(8 * 1024).build(); + MobFileName mobFileName = null; + int ii = 0; + Date today = new Date(); + for (byte k0 : KEYS) { + byte[] startRow = Bytes.toBytes(ii++); + + mobFileName = MobFileName.create(startRow, MobUtils.formatDate(today), mobSuffix); + + StoreFileWriter mobFileWriter = + new StoreFileWriter.Builder(conf, cacheConf, fs).withFileContext(meta) + .withFilePath(new Path(basePath, mobFileName.getFileName())).build(); + + long now = System.currentTimeMillis(); + try { + for (int i = 0; i < 10; i++) { + byte[] key = Bytes.add(Bytes.toBytes(k0), Bytes.toBytes(i)); + byte[] dummyData = new byte[5000]; + new Random().nextBytes(dummyData); + mobFileWriter.append( + new KeyValue(key, Bytes.toBytes(family), Bytes.toBytes(qf), now, Type.Put, dummyData)); + } + } finally { + mobFileWriter.close(); + } + } + } + + /** + * Create mulitple partition delete files + */ + private void createMobDelFile(Path basePath, int startKey) throws IOException { + HFileContext meta = new HFileContextBuilder().withBlockSize(8 * 1024).build(); + MobFileName mobFileName = null; + Date today = new Date(); + + byte[] startRow = Bytes.toBytes(startKey); + + mobFileName = MobFileName.create(startRow, MobUtils.formatDate(today), delSuffix); + + StoreFileWriter mobFileWriter = + new StoreFileWriter.Builder(conf, cacheConf, fs).withFileContext(meta) + .withFilePath(new Path(basePath, mobFileName.getFileName())).build(); + + long now = System.currentTimeMillis(); + try { + byte[] key = Bytes.add(Bytes.toBytes(KEYS[startKey]), Bytes.toBytes(0)); + byte[] dummyData = new byte[5000]; + new Random().nextBytes(dummyData); + mobFileWriter.append( + new KeyValue(key, Bytes.toBytes(family), Bytes.toBytes(qf), now, Type.Delete, dummyData)); + key = Bytes.add(Bytes.toBytes(KEYS[startKey]), Bytes.toBytes(2)); + mobFileWriter.append( + new KeyValue(key, Bytes.toBytes(family), Bytes.toBytes(qf), now, Type.Delete, dummyData)); + key = Bytes.add(Bytes.toBytes(KEYS[startKey]), Bytes.toBytes(4)); + mobFileWriter.append( + new KeyValue(key, Bytes.toBytes(family), Bytes.toBytes(qf), now, Type.Delete, dummyData)); + + } finally { + mobFileWriter.close(); + } + } + + @Test + public void testCompactFilesWithoutDelFile() throws Exception { + String tableName = "testCompactFilesWithoutDelFile"; + resetConf(); + init(tableName); + + createMobFile(basePath); + + listFiles(); + + PartitionedMobCompactor compactor = new PartitionedMobCompactor(conf, fs, + TableName.valueOf(tableName), hcd, pool) { + @Override + public List compact(List files, boolean isForceAllFiles) + throws IOException { + if (files == null || files.isEmpty()) { + return null; + } + + PartitionedMobCompactionRequest request = select(files, isForceAllFiles); + + // Make sure that there is no del Partitions + Assert.assertTrue(request.getDelPartitions().size() == 0); + + // Make sure that when there is no startKey/endKey for partition. + for (CompactionPartition p : request.getCompactionPartitions()) { + Assert.assertTrue(p.getStartKey() == null); + Assert.assertTrue(p.getEndKey() == null); + } + return null; + } + }; + + compactor.compact(allFiles, true); + } + + static class MyPartitionedMobCompactor extends PartitionedMobCompactor { + int delPartitionSize = 0; + int PartitionsIncludeDelFiles = 0; + CacheConfig cacheConfig = null; + + MyPartitionedMobCompactor(Configuration conf, FileSystem fs, TableName tableName, + HColumnDescriptor column, ExecutorService pool, final int delPartitionSize, + final CacheConfig cacheConf, final int PartitionsIncludeDelFiles) + throws IOException { + super(conf, fs, tableName, column, pool); + this.delPartitionSize = delPartitionSize; + this.cacheConfig = cacheConf; + this.PartitionsIncludeDelFiles = PartitionsIncludeDelFiles; + } + + @Override public List compact(List files, boolean isForceAllFiles) + throws IOException { + if (files == null || files.isEmpty()) { + return null; + } + PartitionedMobCompactionRequest request = select(files, isForceAllFiles); + + Assert.assertTrue(request.getDelPartitions().size() == delPartitionSize); + if (request.getDelPartitions().size() > 0) { + for (CompactionPartition p : request.getCompactionPartitions()) { + Assert.assertTrue(p.getStartKey() != null); + Assert.assertTrue(p.getEndKey() != null); + } + } + + try { + for (CompactionDelPartition delPartition : request.getDelPartitions()) { + for (Path newDelPath : delPartition.listDelFiles()) { + StoreFile sf = new StoreFile(fs, newDelPath, conf, this.cacheConfig, BloomType.NONE); + // pre-create reader of a del file to avoid race condition when opening the reader in each + // partition. + sf.createReader(); + delPartition.addStoreFile(sf); + } + } + + // Make sure that CompactionDelPartitions does not overlap + CompactionDelPartition prevDelP = null; + for (CompactionDelPartition delP : request.getDelPartitions()) { + Assert.assertTrue( + Bytes.compareTo(delP.getId().getStartKey(), delP.getId().getEndKey()) <= 0); + + if (prevDelP != null) { + Assert.assertTrue( + Bytes.compareTo(prevDelP.getId().getEndKey(), delP.getId().getStartKey()) < 0); + } + } + + int affectedPartitions = 0; + + // Make sure that only del files within key range for a partition is included in compaction. + // compact the mob files by partitions in parallel. + for (CompactionPartition partition : request.getCompactionPartitions()) { + List delFiles = getListOfDelFilesForPartition(partition, request.getDelPartitions()); + if (!request.getDelPartitions().isEmpty()) { + if (!((Bytes.compareTo(request.getDelPartitions().get(0).getId().getStartKey(), + partition.getEndKey()) > 0) || (Bytes.compareTo( + request.getDelPartitions().get(request.getDelPartitions().size() - 1).getId() + .getEndKey(), partition.getStartKey()) < 0))) { + + if (delFiles.size() > 0) { + Assert.assertTrue(delFiles.size() == 1); + affectedPartitions += delFiles.size(); + Assert.assertTrue(Bytes.compareTo(partition.getStartKey(), + CellUtil.cloneRow(delFiles.get(0).getLastKey())) <= 0); + Assert.assertTrue(Bytes.compareTo(partition.getEndKey(), + CellUtil.cloneRow(delFiles.get(delFiles.size() - 1).getFirstKey())) >= 0); + } + } + } + } + // The del file is only included in one partition + Assert.assertTrue(affectedPartitions == PartitionsIncludeDelFiles); + } finally { + for (CompactionDelPartition delPartition : request.getDelPartitions()) { + for (StoreFile storeFile : delPartition.getStoreFiles()) { + try { + storeFile.closeReader(true); + } catch (IOException e) { + LOG.warn("Failed to close the reader on store file " + storeFile.getPath(), e); + } + } + } + } + + return null; + } + } + + @Test + public void testCompactFilesWithOneDelFile() throws Exception { + String tableName = "testCompactFilesWithOneDelFile"; + resetConf(); + init(tableName); + + // Create only del file. + createMobFile(basePath); + createMobDelFile(basePath, 2); + + listFiles(); + + MyPartitionedMobCompactor compactor = new MyPartitionedMobCompactor(conf, fs, + TableName.valueOf(tableName), hcd, pool, 1, cacheConf, 1); + + compactor.compact(allFiles, true); + } + + @Test + public void testCompactFilesWithMultiDelFiles() throws Exception { + String tableName = "testCompactFilesWithMultiDelFiles"; + resetConf(); + init(tableName); + + // Create only del file. + createMobFile(basePath); + createMobDelFile(basePath, 0); + createMobDelFile(basePath, 1); + createMobDelFile(basePath, 2); + + listFiles(); + + MyPartitionedMobCompactor compactor = new MyPartitionedMobCompactor(conf, fs, + TableName.valueOf(tableName), hcd, pool, 3, cacheConf, 3); + compactor.compact(allFiles, true); + } private void testCompactDelFilesAtBatchSize(String tableName, int batchSize, int delfileMaxCount) throws Exception { @@ -419,12 +648,53 @@ public class TestPartitionedMobCompactor { return null; } PartitionedMobCompactionRequest request = select(files, isForceAllFiles); + + // Make sure that when there is no del files, there will be no startKey/endKey for partition. + if (request.getDelPartitions().size() == 0) { + for (CompactionPartition p : request.getCompactionPartitions()) { + Assert.assertTrue(p.getStartKey() == null); + Assert.assertTrue(p.getEndKey() == null); + } + } + + // Make sure that CompactionDelPartitions does not overlap + CompactionDelPartition prevDelP = null; + for (CompactionDelPartition delP : request.getDelPartitions()) { + Assert.assertTrue(Bytes.compareTo(delP.getId().getStartKey(), + delP.getId().getEndKey()) <= 0); + + if (prevDelP != null) { + Assert.assertTrue(Bytes.compareTo(prevDelP.getId().getEndKey(), + delP.getId().getStartKey()) < 0); + } + } + + // Make sure that only del files within key range for a partition is included in compaction. + // compact the mob files by partitions in parallel. + for (CompactionPartition partition : request.getCompactionPartitions()) { + List delFiles = getListOfDelFilesForPartition(partition, request.getDelPartitions()); + if (!request.getDelPartitions().isEmpty()) { + if (!((Bytes.compareTo(request.getDelPartitions().get(0).getId().getStartKey(), + partition.getEndKey()) > 0) || (Bytes.compareTo( + request.getDelPartitions().get(request.getDelPartitions().size() - 1).getId() + .getEndKey(), partition.getStartKey()) < 0))) { + if (delFiles.size() > 0) { + Assert.assertTrue(Bytes + .compareTo(partition.getStartKey(), delFiles.get(0).getFirstKey().getRowArray()) + >= 0); + Assert.assertTrue(Bytes.compareTo(partition.getEndKey(), + delFiles.get(delFiles.size() - 1).getLastKey().getRowArray()) <= 0); + } + } + } + } + // assert the compaction type Assert.assertEquals(type, request.type); // assert get the right partitions compareCompactedPartitions(expected, request.compactionPartitions); // assert get the right del files - compareDelFiles(request.delFiles); + compareDelFiles(request.getDelPartitions()); return null; } }; @@ -446,8 +716,10 @@ public class TestPartitionedMobCompactor { protected List performCompaction(PartitionedMobCompactionRequest request) throws IOException { List delFilePaths = new ArrayList(); - for (FileStatus delFile : request.delFiles) { - delFilePaths.add(delFile.getPath()); + for (CompactionDelPartition delPartition: request.getDelPartitions()) { + for (Path p : delPartition.listDelFiles()) { + delFilePaths.add(p); + } } List newDelPaths = compactDelFiles(request, delFilePaths); // assert the del files are merged. @@ -466,7 +738,7 @@ public class TestPartitionedMobCompactor { for (FileStatus file : fs.listStatus(basePath)) { allFiles.add(file); if (file.getPath().getName().endsWith("_del")) { - delFiles.add(file); + delFiles.add(file.getPath()); } else { mobFiles.add(file); } @@ -493,13 +765,18 @@ public class TestPartitionedMobCompactor { /** * Compares the del files. - * @param allDelFiles all the del files + * @param delPartitions all del partitions */ - private void compareDelFiles(Collection allDelFiles) { + private void compareDelFiles(List delPartitions) { int i = 0; - for (FileStatus file : allDelFiles) { - Assert.assertEquals(delFiles.get(i), file); - i++; + Map delMap = new HashMap<>(); + for (CompactionDelPartition delPartition : delPartitions) { + for (Path f : delPartition.listDelFiles()) { + delMap.put(f, f); + } + } + for (Path f : delFiles) { + Assert.assertTrue(delMap.containsKey(f)); } }