HBASE-16812 Clean up the locks in MOB

This commit is contained in:
Jingcheng Du 2017-01-31 12:15:09 +08:00
parent e68ab09d5e
commit 47ce72d78c
9 changed files with 93 additions and 202 deletions

View File

@ -36,12 +36,12 @@ import org.apache.hadoop.hbase.regionserver.CellSink;
import org.apache.hadoop.hbase.regionserver.HMobStore;
import org.apache.hadoop.hbase.regionserver.HStore;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.regionserver.MobCompactionStoreScanner;
import org.apache.hadoop.hbase.regionserver.ScanType;
import org.apache.hadoop.hbase.regionserver.ScannerContext;
import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.regionserver.StoreFileScanner;
import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
import org.apache.hadoop.hbase.regionserver.StoreScanner;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor;
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
@ -62,8 +62,8 @@ public class DefaultMobStoreCompactor extends DefaultCompactor {
@Override
public ScanType getScanType(CompactionRequest request) {
return request.isRetainDeleteMarkers() ? ScanType.COMPACT_RETAIN_DELETES
: ScanType.COMPACT_DROP_DELETES;
// retain the delete markers until they are expired.
return ScanType.COMPACT_RETAIN_DELETES;
}
@Override
@ -71,16 +71,8 @@ public class DefaultMobStoreCompactor extends DefaultCompactor {
ScanType scanType, FileDetails fd, long smallestReadPoint) throws IOException {
Scan scan = new Scan();
scan.setMaxVersions(store.getFamily().getMaxVersions());
if (scanType == ScanType.COMPACT_DROP_DELETES) {
// In major compaction, we need to write the delete markers to del files, so we have to
// retain the them in scanning.
scanType = ScanType.COMPACT_RETAIN_DELETES;
return new MobCompactionStoreScanner(store, store.getScanInfo(), scan, scanners,
scanType, smallestReadPoint, fd.earliestPutTs, true);
} else {
return new MobCompactionStoreScanner(store, store.getScanInfo(), scan, scanners,
scanType, smallestReadPoint, fd.earliestPutTs, false);
}
return new StoreScanner(store, store.getScanInfo(), scan, scanners, scanType,
smallestReadPoint, fd.earliestPutTs);
}
};
@ -115,14 +107,12 @@ public class DefaultMobStoreCompactor extends DefaultCompactor {
return compact(request, scannerFactory, writerFactory, throughputController, user);
}
// TODO refactor to take advantage of the throughput controller.
/**
* Performs compaction on a column family with the mob flag enabled.
* This is for when the mob threshold size has changed or if the mob
* column family mode has been toggled via an alter table statement.
* Compacts the files by the following rules.
* 1. If the cell has a mob reference tag, the cell's value is the path of the mob file.
* 1. If the Put cell has a mob reference tag, the cell's value is the path of the mob file.
* <ol>
* <li>
* If the value size of a cell is larger than the threshold, this cell is regarded as a mob,
@ -133,7 +123,7 @@ public class DefaultMobStoreCompactor extends DefaultCompactor {
* the new store file.
* </li>
* </ol>
* 2. If the cell doesn't have a reference tag.
* 2. If the Put cell doesn't have a reference tag.
* <ol>
* <li>
* If the value size of a cell is larger than the threshold, this cell is regarded as a mob,
@ -143,8 +133,17 @@ public class DefaultMobStoreCompactor extends DefaultCompactor {
* Otherwise, directly write this cell into the store file.
* </li>
* </ol>
* In the mob compaction, the {@link MobCompactionStoreScanner} is used as a scanner
* which could output the normal cells and delete markers together when required.
* 3. Decide how to write a Delete cell.
* <ol>
* <li>
* If a Delete cell does not have a mob reference tag which means this delete marker have not
* been written to the mob del file, write this cell to the mob del file, and write this cell
* with a ref tag to a store file.
* </li>
* <li>
* Otherwise, directly write it to a store file.
* </li>
* </ol>
* After the major compaction on the normal hfiles, we have a guarantee that we have purged all
* deleted or old version mob refs, and the delete markers are written to a del file with the
* suffix _del. Because of this, it is safe to use the del file in the mob compaction.
@ -165,11 +164,6 @@ public class DefaultMobStoreCompactor extends DefaultCompactor {
protected boolean performCompaction(FileDetails fd, InternalScanner scanner, CellSink writer,
long smallestReadPoint, boolean cleanSeqId, ThroughputController throughputController,
boolean major, int numofFilesToCompact) throws IOException {
if (!(scanner instanceof MobCompactionStoreScanner)) {
throw new IllegalArgumentException(
"The scanner should be an instance of MobCompactionStoreScanner");
}
MobCompactionStoreScanner compactionScanner = (MobCompactionStoreScanner) scanner;
int bytesWritten = 0;
// Since scanner.next() can return 'false' but still be delivering data,
// we have to use a do/while loop.
@ -198,11 +192,19 @@ public class DefaultMobStoreCompactor extends DefaultCompactor {
ScannerContext scannerContext =
ScannerContext.newBuilder().setBatchLimit(compactionKVMax).build();
do {
hasMore = compactionScanner.next(cells, scannerContext);
hasMore = scanner.next(cells, scannerContext);
for (Cell c : cells) {
if (compactionScanner.isOutputDeleteMarkers() && CellUtil.isDelete(c)) {
delFileWriter.append(c);
deleteMarkersCount++;
if (major && CellUtil.isDelete(c)) {
if (MobUtils.isMobReferenceCell(c)) {
// Directly write it to a store file
writer.append(c);
} else {
// Add a ref tag to this cell and write it to a store file.
writer.append(MobUtils.createMobRefDeleteMarker(c));
// Write the cell to a del file
delFileWriter.append(c);
deleteMarkersCount++;
}
} else if (mobFileWriter == null || c.getTypeByte() != KeyValue.Type.Put.getCode()) {
// If the mob file writer is null or the kv type is not put, directly write the cell
// to the store file.

View File

@ -61,7 +61,6 @@ import org.apache.hadoop.hbase.io.crypto.Encryption;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.io.hfile.HFileContext;
import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.master.locking.LockManager;
import org.apache.hadoop.hbase.mob.compactions.MobCompactor;
import org.apache.hadoop.hbase.mob.compactions.PartitionedMobCompactor;
@ -91,6 +90,12 @@ public final class MobUtils {
}
};
private static final byte[] REF_DELETE_MARKER_TAG_BYTES;
static {
List<Tag> tags = new ArrayList<>();
tags.add(MobConstants.MOB_REF_TAG);
REF_DELETE_MARKER_TAG_BYTES = TagUtil.fromList(tags);
}
/**
* Private constructor to keep this class from being instantiated.
@ -810,4 +815,43 @@ public final class MobUtils {
}
HFileArchiver.archiveStoreFiles(conf, fs, mobRegionInfo, mobFamilyDir, family, storeFileList);
}
/**
* Creates a mob ref delete marker.
* @param cell The current delete marker.
* @return A delete marker with the ref tag.
*/
public static Cell createMobRefDeleteMarker(Cell cell) {
return CellUtil.createCell(cell, TagUtil.concatTags(REF_DELETE_MARKER_TAG_BYTES, cell));
}
/**
* Checks if the mob file is expired.
* @param column The descriptor of the current column family.
* @param current The current time.
* @param fileDate The date string parsed from the mob file name.
* @return True if the mob file is expired.
*/
public static boolean isMobFileExpired(HColumnDescriptor column, long current, String fileDate) {
if (column.getMinVersions() > 0) {
return false;
}
long timeToLive = column.getTimeToLive();
if (Integer.MAX_VALUE == timeToLive) {
return false;
}
Date expireDate = new Date(current - timeToLive * 1000);
expireDate = new Date(expireDate.getYear(), expireDate.getMonth(), expireDate.getDate());
try {
Date date = parseDate(fileDate);
if (date.getTime() < expireDate.getTime()) {
return true;
}
} catch (ParseException e) {
LOG.warn("Failed to parse the date " + fileDate, e);
return false;
}
return false;
}
}

View File

@ -75,6 +75,7 @@ import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
import org.apache.hadoop.hbase.regionserver.StoreScanner;
import org.apache.hadoop.hbase.security.EncryptionUtil;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.Pair;
/**
@ -327,8 +328,9 @@ public class PartitionedMobCompactor extends MobCompactor {
* @param request The compaction request.
* @param partition A compaction partition.
* @param delFiles The del files.
* @param connection to use
* @param table The current table. @return The paths of new mob files after compactions.
* @param connection The connection to use.
* @param table The current table.
* @return The paths of new mob files after compactions.
* @throws IOException if IO failure is encountered
*/
private List<Path> compactMobFilePartition(PartitionedMobCompactionRequest request,
@ -336,6 +338,12 @@ public class PartitionedMobCompactor extends MobCompactor {
List<StoreFile> delFiles,
Connection connection,
Table table) throws IOException {
if (MobUtils.isMobFileExpired(column, EnvironmentEdgeManager.currentTime(),
partition.getPartitionId().getDate())) {
// If the files in the partition are expired, do not compact them and directly
// return an empty list.
return Collections.emptyList();
}
List<Path> newFiles = new ArrayList<>();
List<FileStatus> files = partition.listFiles();
int offset = 0;

View File

@ -20,14 +20,12 @@ package org.apache.hadoop.hbase.regionserver;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.NavigableSet;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -39,7 +37,6 @@ import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValue.Type;
import org.apache.hadoop.hbase.TableName;
@ -48,7 +45,6 @@ import org.apache.hadoop.hbase.TagType;
import org.apache.hadoop.hbase.TagUtil;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.locking.EntityLock;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.FilterList;
import org.apache.hadoop.hbase.io.compress.Compression;
@ -62,9 +58,6 @@ import org.apache.hadoop.hbase.mob.MobFile;
import org.apache.hadoop.hbase.mob.MobFileName;
import org.apache.hadoop.hbase.mob.MobStoreEngine;
import org.apache.hadoop.hbase.mob.MobUtils;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.HFileArchiveUtil;
@ -459,66 +452,6 @@ public class HMobStore extends HStore {
return mobFamilyPath;
}
/**
* The compaction in the mob store.
* The cells in this store contains the path of the mob files. There might be race
* condition between the major compaction and the mob major compaction.
* In order to avoid this, we need mutually exclude the running of the major compaction
* and the mob major compaction.
* The minor compaction is not affected.
* The major compaction is marked as retainDeleteMarkers when a mob major
* compaction is in progress.
*/
@Override
public List<StoreFile> compact(CompactionContext compaction,
ThroughputController throughputController, User user) throws IOException {
// If it's major compaction, try to find whether there's a mob major compaction is running
// If yes, mark the major compaction as retainDeleteMarkers
if (compaction.getRequest().isAllFiles()) {
// Acquire a table lock to coordinate.
// 1. If no, mark the major compaction as retainDeleteMarkers and continue the compaction.
// 2. If the lock is obtained, run the compaction directly.
EntityLock lock = null;
try {
if (region.getRegionServerServices() != null) {
List<HRegionInfo> regionInfos = Collections.singletonList(region.getRegionInfo());
// regionLock takes shared lock on table too.
lock = region.getRegionServerServices().regionLock(regionInfos, "MOB compaction", null);
int awaitTime = conf.getInt(HRegionServer.REGION_LOCK_AWAIT_TIME_SEC,
HRegionServer.DEFAULT_REGION_LOCK_AWAIT_TIME_SEC);
try {
LOG.info("Acquiring MOB major compaction lock " + lock);
lock.requestLock();
lock.await(awaitTime, TimeUnit.SECONDS);
} catch (InterruptedException e) {
LOG.error("Interrupted exception when waiting for lock: " + lock, e);
}
if (!lock.isLocked()) {
// Remove lock from queue on the master so that if it's granted in future, we don't
// keep holding it until compaction finishes
lock.unlock();
lock = null;
LOG.warn("Cannot obtain table lock, maybe a sweep tool is running on this " + "table["
+ getTableName() + "], forcing the delete markers to be retained");
}
} else {
LOG.warn("Cannot obtain lock because RegionServices not available. Are we running as "
+ "compaction tool?");
}
// If no lock, retain delete markers to be safe.
if (lock == null) compaction.getRequest().forceRetainDeleteMarkers();
return super.compact(compaction, throughputController, user);
} finally {
if (lock != null && lock.isLocked()) {
lock.unlock();
}
}
} else {
// If it's not a major compaction, continue the compaction.
return super.compact(compaction, throughputController, user);
}
}
public void updateCellsCountCompactedToMob(long count) {
cellsCountCompactedToMob += count;
}

View File

@ -1,66 +0,0 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
import java.util.List;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.Scan;
/**
* Scanner scans the MOB Store. Coalesce KeyValue stream into {@code List<KeyValue>}
* for a single row. It's only used in the compaction of mob-enabled columns.
* It outputs the normal cells and delete markers when outputDeleteMarkers is set as true.
*/
@InterfaceAudience.Private
public class MobCompactionStoreScanner extends StoreScanner {
/*
* The delete markers are probably contained in the output of the scanner, for instance the
* minor compaction. If outputDeleteMarkers is set as true, these delete markers could be
* written to the del file, otherwise it's not allowed.
*/
protected boolean outputDeleteMarkers;
/**
* Used for compactions.<p>
*
* Opens a scanner across specified StoreFiles.
* @param store who we scan
* @param scan the spec
* @param scanners ancillary scanners
* @param smallestReadPoint the readPoint that we should use for tracking
* versions
*/
public MobCompactionStoreScanner(Store store, ScanInfo scanInfo, Scan scan,
List<? extends KeyValueScanner> scanners, ScanType scanType, long smallestReadPoint,
long earliestPutTs, boolean outputDeleteMarkers) throws IOException {
super(store, scanInfo, scan, scanners, scanType, smallestReadPoint, earliestPutTs);
this.outputDeleteMarkers = outputDeleteMarkers;
}
/**
* Gets whether the delete markers could be written to the del files.
* @return True if the delete markers could be written del files, false if it's not allowed.
*/
public boolean isOutputDeleteMarkers() {
return this.outputDeleteMarkers;
}
}

View File

@ -59,8 +59,6 @@ public class CompactionRequest implements Comparable<CompactionRequest> {
private String storeName = "";
private long totalSize = -1L;
private Boolean retainDeleteMarkers = null;
/**
* This ctor should be used by coprocessors that want to subclass CompactionRequest.
*/
@ -207,23 +205,6 @@ public class CompactionRequest implements Comparable<CompactionRequest> {
: (isMajor ? DisplayCompactionType.MAJOR : DisplayCompactionType.ALL_FILES);
}
/**
* Forcefully setting that this compaction has to retain the delete markers in the new compacted
* file, whatever be the type of the compaction.<br>
* Note : By default HBase drops delete markers when the compaction is on all files.
*/
public void forceRetainDeleteMarkers() {
this.retainDeleteMarkers = Boolean.TRUE;
}
/**
* @return Whether the compaction has to retain the delete markers or not.
*/
public boolean isRetainDeleteMarkers() {
return (this.retainDeleteMarkers != null) ? this.retainDeleteMarkers.booleanValue()
: !isAllFiles();
}
@Override
public String toString() {
String fsList = Joiner.on(", ").join(

View File

@ -231,8 +231,8 @@ public abstract class Compactor<T extends CellSink> {
@Override
public ScanType getScanType(CompactionRequest request) {
return request.isRetainDeleteMarkers() ? ScanType.COMPACT_RETAIN_DELETES
: ScanType.COMPACT_DROP_DELETES;
return request.isAllFiles() ? ScanType.COMPACT_DROP_DELETES
: ScanType.COMPACT_RETAIN_DELETES;
}
@Override

View File

@ -50,8 +50,8 @@ public class TestMobFileName extends TestCase {
public void testHashCode() {
assertEquals(MobFileName.create(startKey, dateStr, uuid).hashCode(),
MobFileName.create(startKey, dateStr, uuid).hashCode());
assertNotSame(MobFileName.create(startKey, dateStr, uuid).hashCode(),
MobFileName.create(startKey, dateStr, uuid).hashCode());
assertNotSame(MobFileName.create(startKey, dateStr, uuid),
MobFileName.create(startKey, dateStr, uuid));
}
@Test

View File

@ -245,13 +245,7 @@ public class TestMobStoreCompaction {
region.compact(true);
assertEquals("After compaction: store files", 1, countStoreFiles());
// still have original mob hfiles and now added a mob del file
// CHANGED EXPECTATION WHEN LOCKING CHANGED. In this context, there is no locking because there
// is not regionserverservices provided on the region (it is null). In this case when
// no services and therefore no means of getting a lock, we will run the mob compaction
// with compaction.getRequest().forceRetainDeleteMarkers();
// .. .this messes w/ expected number. It is one less than when we run
// with the locks.
assertEquals("After compaction: mob files", numHfiles, countMobFiles());
assertEquals("After compaction: mob files", numHfiles + 1, countMobFiles());
Scan scan = new Scan();
scan.setRaw(true);
@ -269,16 +263,11 @@ public class TestMobStoreCompaction {
results.clear();
scanner.next(results);
}
// Assert the delete mark is not retained after the major compaction
// See CHANGED EXPECTATION WHEN LOCKING CHANGED note above. Here too we have different
// expectation in the new locking regime.
// assertEquals(0, deleteCount);
// assert the delete mark is retained after the major compaction
assertEquals(1, deleteCount);
scanner.close();
// assert the deleted cell is not counted
// See CHANGED EXPECTATION WHEN LOCKING CHANGED note above. Here too we have different
// expectation in the new locking regime. We were passing '1' and we had numHFiles -1...
// but changed in below.
assertEquals("The cells in mob files", numHfiles, countMobCellsInMobFiles(0));
assertEquals("The cells in mob files", numHfiles - 1, countMobCellsInMobFiles(1));
}
private int countStoreFiles() throws IOException {