diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobCompactor.java index 47dd0b18e71..ec0cfe55934 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobCompactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobCompactor.java @@ -28,16 +28,21 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValueUtil; import org.apache.hadoop.hbase.Tag; import org.apache.hadoop.hbase.TagType; +import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.regionserver.HMobStore; import org.apache.hadoop.hbase.regionserver.HStore; import org.apache.hadoop.hbase.regionserver.InternalScanner; +import org.apache.hadoop.hbase.regionserver.MobCompactionStoreScanner; +import org.apache.hadoop.hbase.regionserver.ScanType; import org.apache.hadoop.hbase.regionserver.Store; import org.apache.hadoop.hbase.regionserver.StoreFile; import org.apache.hadoop.hbase.regionserver.StoreFile.Writer; +import org.apache.hadoop.hbase.regionserver.StoreFileScanner; import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor; import org.apache.hadoop.hbase.util.Bytes; @@ -78,6 +83,21 @@ public class DefaultMobCompactor extends DefaultCompactor { return writer; } + @Override + protected InternalScanner createScanner(Store store, List scanners, + ScanType scanType, long smallestReadPoint, long earliestPutTs) throws IOException { + Scan scan = new Scan(); + scan.setMaxVersions(store.getFamily().getMaxVersions()); + if (scanType == ScanType.COMPACT_DROP_DELETES) { + scanType = ScanType.COMPACT_RETAIN_DELETES; + return new MobCompactionStoreScanner(store, store.getScanInfo(), scan, scanners, + scanType, smallestReadPoint, earliestPutTs, true); + } else { + return new MobCompactionStoreScanner(store, store.getScanInfo(), scan, scanners, + scanType, smallestReadPoint, earliestPutTs, false); + } + } + /** * Performs compaction on a column family with the mob flag enabled. * This is for when the mob threshold size has changed or if the mob @@ -104,6 +124,14 @@ public class DefaultMobCompactor extends DefaultCompactor { * Otherwise, directly write this cell into the store file. * * + * In the mob compaction, the {@link MobCompactionStoreScanner} is used as a scanner + * which could output the normal cells and delete markers together when required. + * After the major compaction on the normal hfiles, we have a guarantee that we have purged all + * deleted or old version mob refs, and the delete markers are written to a del file with the + * suffix _del. Because of this, it is safe to use the del file in the mob compaction. + * The mob compaction doesn't take place in the normal hfiles, it occurs directly in the + * mob files. When the small mob files are merged into bigger ones, the del file is added into + * the scanner to filter the deleted cells. * @param fd File details * @param scanner Where to read from. * @param writer Where to write to. @@ -115,6 +143,11 @@ public class DefaultMobCompactor extends DefaultCompactor { @Override protected boolean performCompaction(FileDetails fd, InternalScanner scanner, CellSink writer, long smallestReadPoint, boolean cleanSeqId, boolean major) throws IOException { + if (!(scanner instanceof MobCompactionStoreScanner)) { + throw new IllegalArgumentException( + "The scanner should be an instance of MobCompactionStoreScanner"); + } + MobCompactionStoreScanner compactionScanner = (MobCompactionStoreScanner) scanner; int bytesWritten = 0; // Since scanner.next() can return 'false' but still be delivering data, // we have to use a do/while loop. @@ -125,7 +158,9 @@ public class DefaultMobCompactor extends DefaultCompactor { Path path = MobUtils.getMobFamilyPath(conf, store.getTableName(), store.getColumnFamilyName()); byte[] fileName = null; StoreFile.Writer mobFileWriter = null; + StoreFile.Writer delFileWriter = null; long mobCells = 0; + long deleteMarkersCount = 0; Tag tableNameTag = new Tag(TagType.MOB_TABLE_NAME_TAG_TYPE, store.getTableName() .getName()); long mobCompactedIntoMobCellsCount = 0; @@ -144,14 +179,19 @@ public class DefaultMobCompactor extends DefaultCompactor { + "we will continue the compaction by writing MOB cells directly in store files", e); } + delFileWriter = mobStore.createDelFileWriterInTmp(new Date(fd.latestPutTs), fd.maxKeyCount, + store.getFamily().getCompression(), store.getRegionInfo().getStartKey()); do { - hasMore = scanner.next(cells, compactionKVMax); + hasMore = compactionScanner.next(cells, compactionKVMax); // output to writer: for (Cell c : cells) { // TODO remove the KeyValueUtil.ensureKeyValue before merging back to trunk. KeyValue kv = KeyValueUtil.ensureKeyValue(c); resetSeqId(smallestReadPoint, cleanSeqId, kv); - if (mobFileWriter == null || kv.getTypeByte() != KeyValue.Type.Put.getCode()) { + if (compactionScanner.isOutputDeleteMarkers() && CellUtil.isDelete(c)) { + delFileWriter.append(kv); + deleteMarkersCount++; + } else if (mobFileWriter == null || kv.getTypeByte() != KeyValue.Type.Put.getCode()) { // If the mob file writer is null or the kv type is not put, directly write the cell // to the store file. writer.append(kv); @@ -222,8 +262,12 @@ public class DefaultMobCompactor extends DefaultCompactor { mobFileWriter.appendMetadata(fd.maxSeqId, major, mobCells); mobFileWriter.close(); } + if (delFileWriter != null) { + delFileWriter.appendMetadata(fd.maxSeqId, major, deleteMarkersCount); + delFileWriter.close(); + } } - if(mobFileWriter!=null) { + if (mobFileWriter != null) { if (mobCells > 0) { // If the mob file is not empty, commit it. mobStore.commitFile(mobFileWriter.getPath(), path); @@ -236,6 +280,20 @@ public class DefaultMobCompactor extends DefaultCompactor { } } } + if (delFileWriter != null) { + if (deleteMarkersCount > 0) { + // If the del file is not empty, commit it. + // If the commit fails, the compaction is re-performed again. + mobStore.commitFile(delFileWriter.getPath(), path); + } else { + try { + // If the del file is empty, delete it instead of committing. + store.getFileSystem().delete(delFileWriter.getPath(), true); + } catch (IOException e) { + LOG.error("Fail to delete the temp del file", e); + } + } + } mobStore.updateMobCompactedFromMobCellsCount(mobCompactedFromMobCellsCount); mobStore.updateMobCompactedIntoMobCellsCount(mobCompactedIntoMobCellsCount); mobStore.updateMobCompactedFromMobCellsSize(mobCompactedFromMobCellsSize); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java index 569ad0661ef..218a4efa493 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java @@ -165,8 +165,8 @@ public class HMobStore extends HStore { } /** - * Creates the temp directory of mob files for flushing. - * @param date The latest date of cells in the flushing. + * Creates the writer for the mob file in temp directory. + * @param date The latest date of written cells. * @param maxKeyCount The key count. * @param compression The compression algorithm. * @param startKey The start key. @@ -183,7 +183,30 @@ public class HMobStore extends HStore { } /** - * Creates the temp directory of mob files for flushing. + * Creates the writer for the del file in temp directory. + * The del file keeps tracking the delete markers. Its name has a suffix _del, + * the format is [0-9a-f]+(_del)?. + * @param date The latest date of written cells. + * @param maxKeyCount The key count. + * @param compression The compression algorithm. + * @param startKey The start key. + * @return The writer for the del file. + * @throws IOException + */ + public StoreFile.Writer createDelFileWriterInTmp(Date date, long maxKeyCount, + Compression.Algorithm compression, byte[] startKey) throws IOException { + if (startKey == null) { + startKey = HConstants.EMPTY_START_ROW; + } + Path path = getTempDir(); + String suffix = UUID + .randomUUID().toString().replaceAll("-", "") + "_del"; + MobFileName mobFileName = MobFileName.create(startKey, MobUtils.formatDate(date), suffix); + return createWriterInTmp(mobFileName, path, maxKeyCount, compression); + } + + /** + * Creates the writer for the mob file in temp directory. * @param date The date string, its format is yyyymmmdd. * @param basePath The basic path for a temp directory. * @param maxKeyCount The key count. @@ -196,6 +219,20 @@ public class HMobStore extends HStore { Compression.Algorithm compression, byte[] startKey) throws IOException { MobFileName mobFileName = MobFileName.create(startKey, date, UUID.randomUUID() .toString().replaceAll("-", "")); + return createWriterInTmp(mobFileName, basePath, maxKeyCount, compression); + } + + /** + * Creates the writer for the mob file in temp directory. + * @param mobFileName The mob file name. + * @param basePath The basic path for a temp directory. + * @param maxKeyCount The key count. + * @param compression The compression algorithm. + * @return The writer for the mob file. + * @throws IOException + */ + public StoreFile.Writer createWriterInTmp(MobFileName mobFileName, Path basePath, long maxKeyCount, + Compression.Algorithm compression) throws IOException { final CacheConfig writerCacheConf = mobCacheConfig; HFileContext hFileContext = new HFileContextBuilder().withCompression(compression) .withIncludesMvcc(false).withIncludesTags(true) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java index 45edf7f5d2e..ad7318bb39f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java @@ -363,7 +363,7 @@ public class HStore implements Store { * @param family * @return TTL in seconds of the specified family */ - private static long determineTTLFromFamily(final HColumnDescriptor family) { + static long determineTTLFromFamily(final HColumnDescriptor family) { // HCD.getTimeToLive returns ttl in seconds. Convert to milliseconds. long ttl = family.getTimeToLive(); if (ttl == HConstants.FOREVER) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MobCompactionStoreScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MobCompactionStoreScanner.java new file mode 100644 index 00000000000..fc14fa4908c --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MobCompactionStoreScanner.java @@ -0,0 +1,66 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import java.io.IOException; +import java.util.List; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.Scan; + +/** + * Scanner scans the MOB Store. Coalesce KeyValue stream into List + * for a single row. It's only used in the compaction of mob-enabled columns. + * It outputs the normal cells and delete markers when outputDeleteMarkers is set as true. + */ +@InterfaceAudience.Private +public class MobCompactionStoreScanner extends StoreScanner { + + /* + * The delete markers are probably contained in the output of the scanner, for instance the + * minor compaction. If outputDeleteMarkers is set as true, these delete markers could be + * written to the del file, otherwise it's not allowed. + */ + protected boolean outputDeleteMarkers; + + /** + * Used for compactions.

+ * + * Opens a scanner across specified StoreFiles. + * @param store who we scan + * @param scan the spec + * @param scanners ancillary scanners + * @param smallestReadPoint the readPoint that we should use for tracking + * versions + */ + public MobCompactionStoreScanner(Store store, ScanInfo scanInfo, Scan scan, + List scanners, ScanType scanType, long smallestReadPoint, + long earliestPutTs, boolean outputDeleteMarkers) throws IOException { + super(store, scanInfo, scan, scanners, scanType, smallestReadPoint, earliestPutTs); + this.outputDeleteMarkers = outputDeleteMarkers; + } + + /** + * Gets whether the delete markers could be written to the del files. + * @return True if the delete markers could be written del files, false if it's not allowed. + */ + public boolean isOutputDeleteMarkers() { + return this.outputDeleteMarkers; + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileInfo.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileInfo.java index 5034b1b9810..7515cef338c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileInfo.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileInfo.java @@ -49,13 +49,24 @@ public class StoreFileInfo implements Comparable { /** * A non-capture group, for hfiles, so that this can be embedded. * HFiles are uuid ([0-9a-z]+). Bulk loaded hfiles has (_SeqId_[0-9]+_) has suffix. + * The mob del file has (_del) as suffix. */ - public static final String HFILE_NAME_REGEX = "[0-9a-f]+(?:_SeqId_[0-9]+_)?"; + public static final String HFILE_NAME_REGEX = "[0-9a-f]+(?:(?:_SeqId_[0-9]+_)|(?:_del))?"; /** Regex that will work for hfiles */ private static final Pattern HFILE_NAME_PATTERN = Pattern.compile("^(" + HFILE_NAME_REGEX + ")"); + /** + * A non-capture group, for hfiles, so that this can be embedded. + * A del file has (_del) as suffix. + */ + public static final String DELFILE_NAME_REGEX = "[0-9a-f]+(?:_del)"; + + /** Regex that will work for del files */ + private static final Pattern DELFILE_NAME_PATTERN = + Pattern.compile("^(" + DELFILE_NAME_REGEX + ")"); + /** * Regex that will work for straight reference names (.) * and hfilelink reference names (=-.) @@ -361,6 +372,23 @@ public class StoreFileInfo implements Comparable { return m.matches() && m.groupCount() > 0; } + /** + * @param path Path to check. + * @return True if the path has format of a del file. + */ + public static boolean isDelFile(final Path path) { + return isDelFile(path.getName()); + } + + /** + * @param path Path to check. + * @return True if the file name has format of a del file. + */ + public static boolean isDelFile(final String fileName) { + Matcher m = DELFILE_NAME_PATTERN.matcher(fileName); + return m.matches() && m.groupCount() > 0; + } + /** * @param path Path to check. * @return True if the path has format of a HStoreFile reference. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java index 7160b309608..5519b4b6c36 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java @@ -273,7 +273,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner 0); } - private StoreScanner(final Scan scan, ScanInfo scanInfo, + StoreScanner(final Scan scan, ScanInfo scanInfo, ScanType scanType, final NavigableSet columns, final List scanners, long earliestPutTs, long readPt) throws IOException { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMobCompaction.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMobCompaction.java index 3234628eb5f..fb85e87d362 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMobCompaction.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMobCompaction.java @@ -280,6 +280,60 @@ public class TestMobCompaction { scanner.close(); } + @Test + public void testMajorCompactionAfterDelete() throws Exception { + init(UTIL.getConfiguration(), 100); + byte[] dummyData = makeDummyData(200); // larger than mob threshold + HRegionIncommon loader = new HRegionIncommon(region); + // create hfiles and mob hfiles but don't trigger compaction + int numHfiles = compactionThreshold - 1; + byte[] deleteRow = Bytes.add(STARTROW, Bytes.toBytes(0)); + for (int i = 0; i < numHfiles; i++) { + Put p = createPut(i, dummyData); + loader.put(p); + loader.flushcache(); + } + assertEquals("Before compaction: store files", numHfiles, countStoreFiles()); + assertEquals("Before compaction: mob file count", numHfiles, countMobFiles()); + assertEquals("Before compaction: rows", numHfiles, countRows()); + assertEquals("Before compaction: mob rows", numHfiles, countMobRows()); + assertEquals("Before compaction: number of mob cells", numHfiles, countMobCellsInMetadata()); + // now let's delete some cells that contain mobs + Delete delete = new Delete(deleteRow); + delete.deleteFamily(COLUMN_FAMILY); + region.delete(delete); + loader.flushcache(); + + assertEquals("Before compaction: store files", numHfiles + 1, countStoreFiles()); + assertEquals("Before compaction: mob files", numHfiles, countMobFiles()); + region.compactStores(true); + assertEquals("After compaction: store files", 1, countStoreFiles()); + // still have original mob hfiles and now added a mob del file + assertEquals("After compaction: mob files", numHfiles + 1, countMobFiles()); + + Scan scan = new Scan(); + scan.setRaw(true); + InternalScanner scanner = region.getScanner(scan); + List results = new ArrayList(); + scanner.next(results); + int deleteCount = 0; + while (!results.isEmpty()) { + for (Cell c : results) { + if (c.getTypeByte() == KeyValue.Type.DeleteFamily.getCode()) { + deleteCount++; + assertTrue(Bytes.equals(CellUtil.cloneRow(c), deleteRow)); + } + } + results.clear(); + scanner.next(results); + } + // assert the delete mark is not retained after the major compaction + assertEquals(0, deleteCount); + scanner.close(); + // assert the deleted cell is not counted + assertEquals("The cells in mob files", numHfiles - 1, countMobCellsInMobFiles(1)); + } + private int countStoreFiles() throws IOException { Store store = region.getStore(COLUMN_FAMILY); return store.getStorefilesCount(); @@ -348,14 +402,15 @@ public class TestMobCompaction { int scannedCount = 0; List results = new ArrayList(); - boolean hasMore = scanner.next(results); + boolean hasMore = true; while (hasMore) { + hasMore = scanner.next(results); for (Cell c : results) { if (MobUtils.isMobReferenceCell(c)) { scannedCount++; } } - hasMore = scanner.next(results); + results.clear(); } scanner.close(); @@ -369,10 +424,11 @@ public class TestMobCompaction { int scannedCount = 0; List results = new ArrayList(); - boolean hasMore = scanner.next(results); + boolean hasMore = true; while (hasMore) { - scannedCount += results.size(); hasMore = scanner.next(results); + scannedCount += results.size(); + results.clear(); } scanner.close(); @@ -425,4 +481,43 @@ public class TestMobCompaction { return files.size(); } + + private int countMobCellsInMobFiles(int expectedNumDelfiles) throws IOException { + Configuration copyOfConf = new Configuration(conf); + copyOfConf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0f); + CacheConfig cacheConfig = new CacheConfig(copyOfConf); + Path mobDirPath = new Path(MobUtils.getMobRegionPath(conf, htd.getTableName()), + hcd.getNameAsString()); + List sfs = new ArrayList(); + int numDelfiles = 0; + int size = 0; + if (fs.exists(mobDirPath)) { + for (FileStatus f : fs.listStatus(mobDirPath)) { + StoreFile sf = new StoreFile(fs, f.getPath(), conf, cacheConfig, BloomType.NONE); + sfs.add(sf); + if (StoreFileInfo.isDelFile(sf.getPath())) { + numDelfiles++; + } + } + List scanners = StoreFileScanner.getScannersForStoreFiles(sfs, false, true, false, null, + HConstants.LATEST_TIMESTAMP); + Scan scan = new Scan(); + scan.setMaxVersions(hcd.getMaxVersions()); + long timeToPurgeDeletes = Math.max(conf.getLong("hbase.hstore.time.to.purge.deletes", 0), 0); + long ttl = HStore.determineTTLFromFamily(hcd); + ScanInfo scanInfo = new ScanInfo(hcd, ttl, timeToPurgeDeletes, KeyValue.COMPARATOR); + StoreScanner scanner = new StoreScanner(scan, scanInfo, ScanType.COMPACT_DROP_DELETES, null, + scanners, 0L, HConstants.LATEST_TIMESTAMP); + List results = new ArrayList<>(); + boolean hasMore = true; + while (hasMore) { + hasMore = scanner.next(results); + size += results.size(); + results.clear(); + } + } + // assert the number of the existing del files + assertEquals(expectedNumDelfiles, numDelfiles); + return size; + } }