HBASE-12669 - Have compaction scanner save info about delete markers

(Jingcheng Du)
This commit is contained in:
Ramkrishna 2015-01-22 14:52:40 +05:30
parent 36bec4fa07
commit 1b800f7d4c
7 changed files with 297 additions and 13 deletions

View File

@ -28,16 +28,21 @@ import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.Tag;
import org.apache.hadoop.hbase.TagType;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.regionserver.HMobStore;
import org.apache.hadoop.hbase.regionserver.HStore;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.regionserver.MobCompactionStoreScanner;
import org.apache.hadoop.hbase.regionserver.ScanType;
import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.regionserver.StoreFile.Writer;
import org.apache.hadoop.hbase.regionserver.StoreFileScanner;
import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor;
import org.apache.hadoop.hbase.util.Bytes;
@ -78,6 +83,21 @@ public class DefaultMobCompactor extends DefaultCompactor {
return writer;
}
@Override
protected InternalScanner createScanner(Store store, List<StoreFileScanner> scanners,
ScanType scanType, long smallestReadPoint, long earliestPutTs) throws IOException {
Scan scan = new Scan();
scan.setMaxVersions(store.getFamily().getMaxVersions());
if (scanType == ScanType.COMPACT_DROP_DELETES) {
scanType = ScanType.COMPACT_RETAIN_DELETES;
return new MobCompactionStoreScanner(store, store.getScanInfo(), scan, scanners,
scanType, smallestReadPoint, earliestPutTs, true);
} else {
return new MobCompactionStoreScanner(store, store.getScanInfo(), scan, scanners,
scanType, smallestReadPoint, earliestPutTs, false);
}
}
/**
* Performs compaction on a column family with the mob flag enabled.
* This is for when the mob threshold size has changed or if the mob
@ -104,6 +124,14 @@ public class DefaultMobCompactor extends DefaultCompactor {
* Otherwise, directly write this cell into the store file.
* </li>
* </ol>
* In the mob compaction, the {@link MobCompactionStoreScanner} is used as a scanner
* which could output the normal cells and delete markers together when required.
* After the major compaction on the normal hfiles, we have a guarantee that we have purged all
* deleted or old version mob refs, and the delete markers are written to a del file with the
* suffix _del. Because of this, it is safe to use the del file in the mob compaction.
* The mob compaction doesn't take place in the normal hfiles, it occurs directly in the
* mob files. When the small mob files are merged into bigger ones, the del file is added into
* the scanner to filter the deleted cells.
* @param fd File details
* @param scanner Where to read from.
* @param writer Where to write to.
@ -115,6 +143,11 @@ public class DefaultMobCompactor extends DefaultCompactor {
@Override
protected boolean performCompaction(FileDetails fd, InternalScanner scanner, CellSink writer,
long smallestReadPoint, boolean cleanSeqId, boolean major) throws IOException {
if (!(scanner instanceof MobCompactionStoreScanner)) {
throw new IllegalArgumentException(
"The scanner should be an instance of MobCompactionStoreScanner");
}
MobCompactionStoreScanner compactionScanner = (MobCompactionStoreScanner) scanner;
int bytesWritten = 0;
// Since scanner.next() can return 'false' but still be delivering data,
// we have to use a do/while loop.
@ -125,7 +158,9 @@ public class DefaultMobCompactor extends DefaultCompactor {
Path path = MobUtils.getMobFamilyPath(conf, store.getTableName(), store.getColumnFamilyName());
byte[] fileName = null;
StoreFile.Writer mobFileWriter = null;
StoreFile.Writer delFileWriter = null;
long mobCells = 0;
long deleteMarkersCount = 0;
Tag tableNameTag = new Tag(TagType.MOB_TABLE_NAME_TAG_TYPE, store.getTableName()
.getName());
long mobCompactedIntoMobCellsCount = 0;
@ -144,14 +179,19 @@ public class DefaultMobCompactor extends DefaultCompactor {
+ "we will continue the compaction by writing MOB cells directly in store files",
e);
}
delFileWriter = mobStore.createDelFileWriterInTmp(new Date(fd.latestPutTs), fd.maxKeyCount,
store.getFamily().getCompression(), store.getRegionInfo().getStartKey());
do {
hasMore = scanner.next(cells, compactionKVMax);
hasMore = compactionScanner.next(cells, compactionKVMax);
// output to writer:
for (Cell c : cells) {
// TODO remove the KeyValueUtil.ensureKeyValue before merging back to trunk.
KeyValue kv = KeyValueUtil.ensureKeyValue(c);
resetSeqId(smallestReadPoint, cleanSeqId, kv);
if (mobFileWriter == null || kv.getTypeByte() != KeyValue.Type.Put.getCode()) {
if (compactionScanner.isOutputDeleteMarkers() && CellUtil.isDelete(c)) {
delFileWriter.append(kv);
deleteMarkersCount++;
} else if (mobFileWriter == null || kv.getTypeByte() != KeyValue.Type.Put.getCode()) {
// If the mob file writer is null or the kv type is not put, directly write the cell
// to the store file.
writer.append(kv);
@ -222,8 +262,12 @@ public class DefaultMobCompactor extends DefaultCompactor {
mobFileWriter.appendMetadata(fd.maxSeqId, major, mobCells);
mobFileWriter.close();
}
if (delFileWriter != null) {
delFileWriter.appendMetadata(fd.maxSeqId, major, deleteMarkersCount);
delFileWriter.close();
}
}
if(mobFileWriter!=null) {
if (mobFileWriter != null) {
if (mobCells > 0) {
// If the mob file is not empty, commit it.
mobStore.commitFile(mobFileWriter.getPath(), path);
@ -236,6 +280,20 @@ public class DefaultMobCompactor extends DefaultCompactor {
}
}
}
if (delFileWriter != null) {
if (deleteMarkersCount > 0) {
// If the del file is not empty, commit it.
// If the commit fails, the compaction is re-performed again.
mobStore.commitFile(delFileWriter.getPath(), path);
} else {
try {
// If the del file is empty, delete it instead of committing.
store.getFileSystem().delete(delFileWriter.getPath(), true);
} catch (IOException e) {
LOG.error("Fail to delete the temp del file", e);
}
}
}
mobStore.updateMobCompactedFromMobCellsCount(mobCompactedFromMobCellsCount);
mobStore.updateMobCompactedIntoMobCellsCount(mobCompactedIntoMobCellsCount);
mobStore.updateMobCompactedFromMobCellsSize(mobCompactedFromMobCellsSize);

View File

@ -165,8 +165,8 @@ public class HMobStore extends HStore {
}
/**
* Creates the temp directory of mob files for flushing.
* @param date The latest date of cells in the flushing.
* Creates the writer for the mob file in temp directory.
* @param date The latest date of written cells.
* @param maxKeyCount The key count.
* @param compression The compression algorithm.
* @param startKey The start key.
@ -183,7 +183,30 @@ public class HMobStore extends HStore {
}
/**
* Creates the temp directory of mob files for flushing.
* Creates the writer for the del file in temp directory.
* The del file keeps tracking the delete markers. Its name has a suffix _del,
* the format is [0-9a-f]+(_del)?.
* @param date The latest date of written cells.
* @param maxKeyCount The key count.
* @param compression The compression algorithm.
* @param startKey The start key.
* @return The writer for the del file.
* @throws IOException
*/
public StoreFile.Writer createDelFileWriterInTmp(Date date, long maxKeyCount,
Compression.Algorithm compression, byte[] startKey) throws IOException {
if (startKey == null) {
startKey = HConstants.EMPTY_START_ROW;
}
Path path = getTempDir();
String suffix = UUID
.randomUUID().toString().replaceAll("-", "") + "_del";
MobFileName mobFileName = MobFileName.create(startKey, MobUtils.formatDate(date), suffix);
return createWriterInTmp(mobFileName, path, maxKeyCount, compression);
}
/**
* Creates the writer for the mob file in temp directory.
* @param date The date string, its format is yyyymmmdd.
* @param basePath The basic path for a temp directory.
* @param maxKeyCount The key count.
@ -196,6 +219,20 @@ public class HMobStore extends HStore {
Compression.Algorithm compression, byte[] startKey) throws IOException {
MobFileName mobFileName = MobFileName.create(startKey, date, UUID.randomUUID()
.toString().replaceAll("-", ""));
return createWriterInTmp(mobFileName, basePath, maxKeyCount, compression);
}
/**
* Creates the writer for the mob file in temp directory.
* @param mobFileName The mob file name.
* @param basePath The basic path for a temp directory.
* @param maxKeyCount The key count.
* @param compression The compression algorithm.
* @return The writer for the mob file.
* @throws IOException
*/
public StoreFile.Writer createWriterInTmp(MobFileName mobFileName, Path basePath, long maxKeyCount,
Compression.Algorithm compression) throws IOException {
final CacheConfig writerCacheConf = mobCacheConfig;
HFileContext hFileContext = new HFileContextBuilder().withCompression(compression)
.withIncludesMvcc(false).withIncludesTags(true)

View File

@ -363,7 +363,7 @@ public class HStore implements Store {
* @param family
* @return TTL in seconds of the specified family
*/
private static long determineTTLFromFamily(final HColumnDescriptor family) {
static long determineTTLFromFamily(final HColumnDescriptor family) {
// HCD.getTimeToLive returns ttl in seconds. Convert to milliseconds.
long ttl = family.getTimeToLive();
if (ttl == HConstants.FOREVER) {

View File

@ -0,0 +1,66 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
import java.util.List;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.Scan;
/**
* Scanner scans the MOB Store. Coalesce KeyValue stream into List<KeyValue>
* for a single row. It's only used in the compaction of mob-enabled columns.
* It outputs the normal cells and delete markers when outputDeleteMarkers is set as true.
*/
@InterfaceAudience.Private
public class MobCompactionStoreScanner extends StoreScanner {
/*
* The delete markers are probably contained in the output of the scanner, for instance the
* minor compaction. If outputDeleteMarkers is set as true, these delete markers could be
* written to the del file, otherwise it's not allowed.
*/
protected boolean outputDeleteMarkers;
/**
* Used for compactions.<p>
*
* Opens a scanner across specified StoreFiles.
* @param store who we scan
* @param scan the spec
* @param scanners ancillary scanners
* @param smallestReadPoint the readPoint that we should use for tracking
* versions
*/
public MobCompactionStoreScanner(Store store, ScanInfo scanInfo, Scan scan,
List<? extends KeyValueScanner> scanners, ScanType scanType, long smallestReadPoint,
long earliestPutTs, boolean outputDeleteMarkers) throws IOException {
super(store, scanInfo, scan, scanners, scanType, smallestReadPoint, earliestPutTs);
this.outputDeleteMarkers = outputDeleteMarkers;
}
/**
* Gets whether the delete markers could be written to the del files.
* @return True if the delete markers could be written del files, false if it's not allowed.
*/
public boolean isOutputDeleteMarkers() {
return this.outputDeleteMarkers;
}
}

View File

@ -49,13 +49,24 @@ public class StoreFileInfo implements Comparable<StoreFileInfo> {
/**
* A non-capture group, for hfiles, so that this can be embedded.
* HFiles are uuid ([0-9a-z]+). Bulk loaded hfiles has (_SeqId_[0-9]+_) has suffix.
* The mob del file has (_del) as suffix.
*/
public static final String HFILE_NAME_REGEX = "[0-9a-f]+(?:_SeqId_[0-9]+_)?";
public static final String HFILE_NAME_REGEX = "[0-9a-f]+(?:(?:_SeqId_[0-9]+_)|(?:_del))?";
/** Regex that will work for hfiles */
private static final Pattern HFILE_NAME_PATTERN =
Pattern.compile("^(" + HFILE_NAME_REGEX + ")");
/**
* A non-capture group, for hfiles, so that this can be embedded.
* A del file has (_del) as suffix.
*/
public static final String DELFILE_NAME_REGEX = "[0-9a-f]+(?:_del)";
/** Regex that will work for del files */
private static final Pattern DELFILE_NAME_PATTERN =
Pattern.compile("^(" + DELFILE_NAME_REGEX + ")");
/**
* Regex that will work for straight reference names (<hfile>.<parentEncRegion>)
* and hfilelink reference names (<table>=<region>-<hfile>.<parentEncRegion>)
@ -361,6 +372,23 @@ public class StoreFileInfo implements Comparable<StoreFileInfo> {
return m.matches() && m.groupCount() > 0;
}
/**
* @param path Path to check.
* @return True if the path has format of a del file.
*/
public static boolean isDelFile(final Path path) {
return isDelFile(path.getName());
}
/**
* @param path Path to check.
* @return True if the file name has format of a del file.
*/
public static boolean isDelFile(final String fileName) {
Matcher m = DELFILE_NAME_PATTERN.matcher(fileName);
return m.matches() && m.groupCount() > 0;
}
/**
* @param path Path to check.
* @return True if the path has format of a HStoreFile reference.

View File

@ -273,7 +273,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
0);
}
private StoreScanner(final Scan scan, ScanInfo scanInfo,
StoreScanner(final Scan scan, ScanInfo scanInfo,
ScanType scanType, final NavigableSet<byte[]> columns,
final List<KeyValueScanner> scanners, long earliestPutTs, long readPt)
throws IOException {

View File

@ -280,6 +280,60 @@ public class TestMobCompaction {
scanner.close();
}
@Test
public void testMajorCompactionAfterDelete() throws Exception {
init(UTIL.getConfiguration(), 100);
byte[] dummyData = makeDummyData(200); // larger than mob threshold
HRegionIncommon loader = new HRegionIncommon(region);
// create hfiles and mob hfiles but don't trigger compaction
int numHfiles = compactionThreshold - 1;
byte[] deleteRow = Bytes.add(STARTROW, Bytes.toBytes(0));
for (int i = 0; i < numHfiles; i++) {
Put p = createPut(i, dummyData);
loader.put(p);
loader.flushcache();
}
assertEquals("Before compaction: store files", numHfiles, countStoreFiles());
assertEquals("Before compaction: mob file count", numHfiles, countMobFiles());
assertEquals("Before compaction: rows", numHfiles, countRows());
assertEquals("Before compaction: mob rows", numHfiles, countMobRows());
assertEquals("Before compaction: number of mob cells", numHfiles, countMobCellsInMetadata());
// now let's delete some cells that contain mobs
Delete delete = new Delete(deleteRow);
delete.deleteFamily(COLUMN_FAMILY);
region.delete(delete);
loader.flushcache();
assertEquals("Before compaction: store files", numHfiles + 1, countStoreFiles());
assertEquals("Before compaction: mob files", numHfiles, countMobFiles());
region.compactStores(true);
assertEquals("After compaction: store files", 1, countStoreFiles());
// still have original mob hfiles and now added a mob del file
assertEquals("After compaction: mob files", numHfiles + 1, countMobFiles());
Scan scan = new Scan();
scan.setRaw(true);
InternalScanner scanner = region.getScanner(scan);
List<Cell> results = new ArrayList<Cell>();
scanner.next(results);
int deleteCount = 0;
while (!results.isEmpty()) {
for (Cell c : results) {
if (c.getTypeByte() == KeyValue.Type.DeleteFamily.getCode()) {
deleteCount++;
assertTrue(Bytes.equals(CellUtil.cloneRow(c), deleteRow));
}
}
results.clear();
scanner.next(results);
}
// assert the delete mark is not retained after the major compaction
assertEquals(0, deleteCount);
scanner.close();
// assert the deleted cell is not counted
assertEquals("The cells in mob files", numHfiles - 1, countMobCellsInMobFiles(1));
}
private int countStoreFiles() throws IOException {
Store store = region.getStore(COLUMN_FAMILY);
return store.getStorefilesCount();
@ -348,14 +402,15 @@ public class TestMobCompaction {
int scannedCount = 0;
List<Cell> results = new ArrayList<Cell>();
boolean hasMore = scanner.next(results);
boolean hasMore = true;
while (hasMore) {
hasMore = scanner.next(results);
for (Cell c : results) {
if (MobUtils.isMobReferenceCell(c)) {
scannedCount++;
}
}
hasMore = scanner.next(results);
results.clear();
}
scanner.close();
@ -369,10 +424,11 @@ public class TestMobCompaction {
int scannedCount = 0;
List<Cell> results = new ArrayList<Cell>();
boolean hasMore = scanner.next(results);
boolean hasMore = true;
while (hasMore) {
scannedCount += results.size();
hasMore = scanner.next(results);
scannedCount += results.size();
results.clear();
}
scanner.close();
@ -425,4 +481,43 @@ public class TestMobCompaction {
return files.size();
}
private int countMobCellsInMobFiles(int expectedNumDelfiles) throws IOException {
Configuration copyOfConf = new Configuration(conf);
copyOfConf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0f);
CacheConfig cacheConfig = new CacheConfig(copyOfConf);
Path mobDirPath = new Path(MobUtils.getMobRegionPath(conf, htd.getTableName()),
hcd.getNameAsString());
List<StoreFile> sfs = new ArrayList<StoreFile>();
int numDelfiles = 0;
int size = 0;
if (fs.exists(mobDirPath)) {
for (FileStatus f : fs.listStatus(mobDirPath)) {
StoreFile sf = new StoreFile(fs, f.getPath(), conf, cacheConfig, BloomType.NONE);
sfs.add(sf);
if (StoreFileInfo.isDelFile(sf.getPath())) {
numDelfiles++;
}
}
List scanners = StoreFileScanner.getScannersForStoreFiles(sfs, false, true, false, null,
HConstants.LATEST_TIMESTAMP);
Scan scan = new Scan();
scan.setMaxVersions(hcd.getMaxVersions());
long timeToPurgeDeletes = Math.max(conf.getLong("hbase.hstore.time.to.purge.deletes", 0), 0);
long ttl = HStore.determineTTLFromFamily(hcd);
ScanInfo scanInfo = new ScanInfo(hcd, ttl, timeToPurgeDeletes, KeyValue.COMPARATOR);
StoreScanner scanner = new StoreScanner(scan, scanInfo, ScanType.COMPACT_DROP_DELETES, null,
scanners, 0L, HConstants.LATEST_TIMESTAMP);
List<Cell> results = new ArrayList<>();
boolean hasMore = true;
while (hasMore) {
hasMore = scanner.next(results);
size += results.size();
results.clear();
}
}
// assert the number of the existing del files
assertEquals(expectedNumDelfiles, numDelfiles);
return size;
}
}