HBASE-26969:Eliminate MOB renames when SFT is enabled (#4418)

Signed-off-by: Wellington Chevreuil <wchevreuil@apache.org>
This commit is contained in:
BukrosSzabolcs 2022-06-21 11:18:55 +02:00 committed by GitHub
parent b498efdbc1
commit 5cf728da5a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
20 changed files with 860 additions and 98 deletions

View File

@ -293,20 +293,20 @@ public class DefaultMobStoreCompactor extends DefaultCompactor {
* @param cleanSeqId When true, remove seqId(used to be mvcc) value which is <=
* smallestReadPoint
* @param throughputController The compaction throughput controller.
* @param major Is a major compaction.
* @param numofFilesToCompact the number of files to compact
* @param request compaction request.
* @param progress Progress reporter.
* @return Whether compaction ended; false if it was interrupted for any reason.
*/
@Override
protected boolean performCompaction(FileDetails fd, InternalScanner scanner, CellSink writer,
long smallestReadPoint, boolean cleanSeqId, ThroughputController throughputController,
boolean major, int numofFilesToCompact, CompactionProgress progress) throws IOException {
CompactionRequestImpl request, CompactionProgress progress) throws IOException {
long bytesWrittenProgressForLog = 0;
long bytesWrittenProgressForShippedCall = 0;
// Clear old mob references
mobRefSet.get().clear();
boolean isUserRequest = userRequest.get();
boolean major = request.isAllFiles();
boolean compactMOBs = major && isUserRequest;
boolean discardMobMiss = conf.getBoolean(MobConstants.MOB_UNSAFE_DISCARD_MISS_KEY,
MobConstants.DEFAULT_MOB_DISCARD_MISS);
@ -350,12 +350,12 @@ public class DefaultMobStoreCompactor extends DefaultCompactor {
throughputController.start(compactionName);
KeyValueScanner kvs = (scanner instanceof KeyValueScanner) ? (KeyValueScanner) scanner : null;
long shippedCallSizeLimit =
(long) numofFilesToCompact * this.store.getColumnFamilyDescriptor().getBlocksize();
(long) request.getFiles().size() * this.store.getColumnFamilyDescriptor().getBlocksize();
Cell mobCell = null;
try {
mobFileWriter = newMobWriter(fd, major);
mobFileWriter = newMobWriter(fd, major, request.getWriterCreationTracker());
fileName = Bytes.toBytes(mobFileWriter.getPath().getName());
do {
@ -435,7 +435,7 @@ public class DefaultMobStoreCompactor extends DefaultCompactor {
LOG.debug("Closing output MOB File, length={} file={}, store={}", len,
mobFileWriter.getPath().getName(), getStoreInfo());
commitOrAbortMobWriter(mobFileWriter, fd.maxSeqId, mobCells, major);
mobFileWriter = newMobWriter(fd, major);
mobFileWriter = newMobWriter(fd, major, request.getWriterCreationTracker());
fileName = Bytes.toBytes(mobFileWriter.getPath().getName());
mobCells = 0;
}
@ -479,7 +479,7 @@ public class DefaultMobStoreCompactor extends DefaultCompactor {
long len = mobFileWriter.getPos();
if (len > maxMobFileSize) {
commitOrAbortMobWriter(mobFileWriter, fd.maxSeqId, mobCells, major);
mobFileWriter = newMobWriter(fd, major);
mobFileWriter = newMobWriter(fd, major, request.getWriterCreationTracker());
fileName = Bytes.toBytes(mobFileWriter.getPath().getName());
mobCells = 0;
}
@ -531,7 +531,7 @@ public class DefaultMobStoreCompactor extends DefaultCompactor {
long len = mobFileWriter.getPos();
if (len > maxMobFileSize) {
commitOrAbortMobWriter(mobFileWriter, fd.maxSeqId, mobCells, major);
mobFileWriter = newMobWriter(fd, major);
mobFileWriter = newMobWriter(fd, major, request.getWriterCreationTracker());
fileName = Bytes.toBytes(mobFileWriter.getPath().getName());
mobCells = 0;
}
@ -617,11 +617,16 @@ public class DefaultMobStoreCompactor extends DefaultCompactor {
}
}
private StoreFileWriter newMobWriter(FileDetails fd, boolean major) throws IOException {
private StoreFileWriter newMobWriter(FileDetails fd, boolean major,
Consumer<Path> writerCreationTracker) throws IOException {
try {
StoreFileWriter mobFileWriter = mobStore.createWriterInTmp(new Date(fd.latestPutTs),
fd.maxKeyCount, major ? majorCompactionCompression : minorCompactionCompression,
store.getRegionInfo().getStartKey(), true);
StoreFileWriter mobFileWriter = mobStore.getStoreEngine().requireWritingToTmpDirFirst()
? mobStore.createWriterInTmp(new Date(fd.latestPutTs), fd.maxKeyCount,
major ? majorCompactionCompression : minorCompactionCompression,
store.getRegionInfo().getStartKey(), true)
: mobStore.createWriter(new Date(fd.latestPutTs), fd.maxKeyCount,
major ? majorCompactionCompression : minorCompactionCompression,
store.getRegionInfo().getStartKey(), true, writerCreationTracker);
LOG.debug("New MOB writer created={} store={}", mobFileWriter.getPath().getName(),
getStoreInfo());
// Add reference we get for compact MOB

View File

@ -127,7 +127,8 @@ public class DefaultMobStoreFlusher extends DefaultStoreFlusher {
try {
// It's a mob store, flush the cells in a mob way. This is the difference of flushing
// between a normal and a mob store.
performMobFlush(snapshot, cacheFlushId, scanner, writer, status, throughputController);
performMobFlush(snapshot, cacheFlushId, scanner, writer, status, throughputController,
writerCreationTracker);
} catch (IOException ioe) {
e = ioe;
// throw the exception out
@ -171,16 +172,21 @@ public class DefaultMobStoreFlusher extends DefaultStoreFlusher {
*/
protected void performMobFlush(MemStoreSnapshot snapshot, long cacheFlushId,
InternalScanner scanner, StoreFileWriter writer, MonitoredTask status,
ThroughputController throughputController) throws IOException {
ThroughputController throughputController, Consumer<Path> writerCreationTracker)
throws IOException {
StoreFileWriter mobFileWriter = null;
int compactionKVMax =
conf.getInt(HConstants.COMPACTION_KV_MAX, HConstants.COMPACTION_KV_MAX_DEFAULT);
long mobCount = 0;
long mobSize = 0;
long time = snapshot.getTimeRangeTracker().getMax();
mobFileWriter = mobStore.createWriterInTmp(new Date(time), snapshot.getCellsCount(),
mobFileWriter = mobStore.getStoreEngine().requireWritingToTmpDirFirst()
? mobStore.createWriterInTmp(new Date(time), snapshot.getCellsCount(),
store.getColumnFamilyDescriptor().getCompressionType(), store.getRegionInfo().getStartKey(),
false);
false)
: mobStore.createWriter(new Date(time), snapshot.getCellsCount(),
store.getColumnFamilyDescriptor().getCompressionType(), store.getRegionInfo().getStartKey(),
false, writerCreationTracker);
// the target path is {tableName}/.mob/{cfName}/mobFiles
// the relative path is mobFiles
byte[] fileName = Bytes.toBytes(mobFileWriter.getPath().getName());

View File

@ -160,14 +160,15 @@ public class MobFileCleanerChore extends ScheduledChore {
maxCreationTimeToArchive, table);
}
FileSystem fs = FileSystem.get(conf);
Set<String> regionNames = new HashSet<>();
Path rootDir = CommonFSUtils.getRootDir(conf);
Path tableDir = CommonFSUtils.getTableDir(rootDir, table);
// How safe is this call?
List<Path> regionDirs = FSUtils.getRegionDirs(FileSystem.get(conf), tableDir);
List<Path> regionDirs = FSUtils.getRegionDirs(fs, tableDir);
Set<String> allActiveMobFileName = new HashSet<String>();
FileSystem fs = FileSystem.get(conf);
for (Path regionPath : regionDirs) {
regionNames.add(regionPath.getName());
for (ColumnFamilyDescriptor hcd : list) {
String family = hcd.getNameAsString();
Path storePath = new Path(regionPath, family);
@ -195,13 +196,26 @@ public class MobFileCleanerChore extends ScheduledChore {
for (Path pp : storeFiles) {
currentPath = pp;
LOG.trace("Store file: {}", pp);
HStoreFile sf =
new HStoreFile(fs, pp, conf, CacheConfig.DISABLED, BloomType.NONE, true);
HStoreFile sf = null;
byte[] mobRefData = null;
byte[] bulkloadMarkerData = null;
try {
sf = new HStoreFile(fs, pp, conf, CacheConfig.DISABLED, BloomType.NONE, true);
sf.initReader();
byte[] mobRefData = sf.getMetadataValue(HStoreFile.MOB_FILE_REFS);
byte[] bulkloadMarkerData = sf.getMetadataValue(HStoreFile.BULKLOAD_TASK_KEY);
mobRefData = sf.getMetadataValue(HStoreFile.MOB_FILE_REFS);
bulkloadMarkerData = sf.getMetadataValue(HStoreFile.BULKLOAD_TASK_KEY);
// close store file to avoid memory leaks
sf.closeStoreFile(true);
} catch (IOException ex) {
// When FileBased SFT is active the store dir can contain corrupted or incomplete
// files. So read errors are expected. We just skip these files.
if (ex instanceof FileNotFoundException) {
throw ex;
}
LOG.debug("Failed to get mob data from file: {} due to error.", pp.toString(),
ex);
continue;
}
if (mobRefData == null) {
if (bulkloadMarkerData == null) {
LOG.warn("Found old store file with no MOB_FILE_REFS: {} - "
@ -256,9 +270,11 @@ public class MobFileCleanerChore extends ScheduledChore {
while (rit.hasNext()) {
LocatedFileStatus lfs = rit.next();
Path p = lfs.getPath();
if (!allActiveMobFileName.contains(p.getName())) {
// MOB is not in a list of active references, but it can be too
// fresh, skip it in this case
String[] mobParts = p.getName().split("_");
String regionName = mobParts[mobParts.length - 1];
if (!regionNames.contains(regionName)) {
// MOB belonged to a region no longer hosted
long creationTime = fs.getFileStatus(p).getModificationTime();
if (creationTime < maxCreationTimeToArchive) {
LOG.trace("Archiving MOB file {} creation time={}", p,
@ -269,7 +285,7 @@ public class MobFileCleanerChore extends ScheduledChore {
fs.getFileStatus(p).getModificationTime());
}
} else {
LOG.trace("Keeping active MOB file: {}", p);
LOG.trace("Keeping MOB file with existing region: {}", p);
}
}
LOG.info(" MOB Cleaner found {} files to archive for table={} family={}", toArchive.size(),

View File

@ -33,6 +33,7 @@ import java.util.Date;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import java.util.function.Consumer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
@ -584,6 +585,33 @@ public final class MobUtils {
CacheConfig cacheConfig, Encryption.Context cryptoContext, ChecksumType checksumType,
int bytesPerChecksum, int blocksize, BloomType bloomType, boolean isCompaction)
throws IOException {
return createWriter(conf, fs, family, path, maxKeyCount, compression, cacheConfig,
cryptoContext, checksumType, bytesPerChecksum, blocksize, bloomType, isCompaction, null);
}
/**
* Creates a writer for the mob file in temp directory.
* @param conf The current configuration.
* @param fs The current file system.
* @param family The descriptor of the current column family.
* @param path The path for a temp directory.
* @param maxKeyCount The key count.
* @param compression The compression algorithm.
* @param cacheConfig The current cache config.
* @param cryptoContext The encryption context.
* @param checksumType The checksum type.
* @param bytesPerChecksum The bytes per checksum.
* @param blocksize The HFile block size.
* @param bloomType The bloom filter type.
* @param isCompaction If the writer is used in compaction.
* @param writerCreationTracker to track the current writer in the store
* @return The writer for the mob file.
*/
public static StoreFileWriter createWriter(Configuration conf, FileSystem fs,
ColumnFamilyDescriptor family, Path path, long maxKeyCount, Compression.Algorithm compression,
CacheConfig cacheConfig, Encryption.Context cryptoContext, ChecksumType checksumType,
int bytesPerChecksum, int blocksize, BloomType bloomType, boolean isCompaction,
Consumer<Path> writerCreationTracker) throws IOException {
if (compression == null) {
compression = HFile.DEFAULT_COMPRESSION_ALGORITHM;
}
@ -602,7 +630,8 @@ public final class MobUtils {
.withCreateTime(EnvironmentEdgeManager.currentTime()).build();
StoreFileWriter w = new StoreFileWriter.Builder(conf, writerCacheConf, fs).withFilePath(path)
.withBloomType(bloomType).withMaxKeyCount(maxKeyCount).withFileContext(hFileContext).build();
.withBloomType(bloomType).withMaxKeyCount(maxKeyCount).withFileContext(hFileContext)
.withWriterCreationTracker(writerCreationTracker).build();
return w;
}

View File

@ -0,0 +1,272 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.mob;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.hbase.ScheduledChore;
import org.apache.hadoop.hbase.TableDescriptors;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.backup.HFileArchiver;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.HStore;
import org.apache.hadoop.hbase.regionserver.HStoreFile;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.collect.SetMultimap;
/**
* The class RSMobFileCleanerChore for running cleaner regularly to remove the obsolete (files which
* have no active references to) mob files that were referenced from the current RS.
*/
@InterfaceAudience.Private
public class RSMobFileCleanerChore extends ScheduledChore {
private static final Logger LOG = LoggerFactory.getLogger(RSMobFileCleanerChore.class);
private final HRegionServer rs;
public RSMobFileCleanerChore(HRegionServer rs) {
super(rs.getServerName() + "-MobFileCleanerChore", rs,
rs.getConfiguration().getInt(MobConstants.MOB_CLEANER_PERIOD,
MobConstants.DEFAULT_MOB_CLEANER_PERIOD),
Math.round(rs.getConfiguration().getInt(MobConstants.MOB_CLEANER_PERIOD,
MobConstants.DEFAULT_MOB_CLEANER_PERIOD)
* ((ThreadLocalRandom.current().nextDouble() + 0.5D))),
TimeUnit.SECONDS);
// to prevent a load spike on the fs the initial delay is modified by +/- 50%
this.rs = rs;
}
public RSMobFileCleanerChore() {
this.rs = null;
}
@Override
protected void chore() {
long minAgeToArchive = rs.getConfiguration().getLong(MobConstants.MIN_AGE_TO_ARCHIVE_KEY,
MobConstants.DEFAULT_MIN_AGE_TO_ARCHIVE);
// We check only those MOB files, which creation time is less
// than maxCreationTimeToArchive. This is a current time - 1h. 1 hour gap
// gives us full confidence that all corresponding store files will
// exist at the time cleaning procedure begins and will be examined.
// So, if MOB file creation time is greater than this maxTimeToArchive,
// this will be skipped and won't be archived.
long maxCreationTimeToArchive = EnvironmentEdgeManager.currentTime() - minAgeToArchive;
TableDescriptors htds = rs.getTableDescriptors();
try {
FileSystem fs = FileSystem.get(rs.getConfiguration());
Map<String, TableDescriptor> map = null;
try {
map = htds.getAll();
} catch (IOException e) {
LOG.error("MobFileCleanerChore failed", e);
return;
}
Map<String, Map<String, List<String>>> referencedMOBs = new HashMap<>();
for (TableDescriptor htd : map.values()) {
// Now clean obsolete files for a table
LOG.info("Cleaning obsolete MOB files from table={}", htd.getTableName());
List<ColumnFamilyDescriptor> list = MobUtils.getMobColumnFamilies(htd);
List<HRegion> regions = rs.getRegions(htd.getTableName());
for (HRegion region : regions) {
for (ColumnFamilyDescriptor hcd : list) {
HStore store = region.getStore(hcd.getName());
Collection<HStoreFile> sfs = store.getStorefiles();
Set<String> regionMobs = new HashSet<String>();
Path currentPath = null;
try {
// collectinng referenced MOBs
for (HStoreFile sf : sfs) {
currentPath = sf.getPath();
sf.initReader();
byte[] mobRefData = sf.getMetadataValue(HStoreFile.MOB_FILE_REFS);
byte[] bulkloadMarkerData = sf.getMetadataValue(HStoreFile.BULKLOAD_TASK_KEY);
// close store file to avoid memory leaks
sf.closeStoreFile(true);
if (mobRefData == null) {
if (bulkloadMarkerData == null) {
LOG.warn(
"Found old store file with no MOB_FILE_REFS: {} - "
+ "can not proceed until all old files will be MOB-compacted.",
currentPath);
return;
} else {
LOG.debug("Skipping file without MOB references (bulkloaded file):{}",
currentPath);
continue;
}
}
// file may or may not have MOB references, but was created by the distributed
// mob compaction code.
try {
SetMultimap<TableName, String> mobs =
MobUtils.deserializeMobFileRefs(mobRefData).build();
LOG.debug("Found {} mob references for store={}", mobs.size(), sf);
LOG.trace("Specific mob references found for store={} : {}", sf, mobs);
regionMobs.addAll(mobs.values());
} catch (RuntimeException exception) {
throw new IOException("failure getting mob references for hfile " + sf,
exception);
}
}
// collecting files, MOB included currently being written
regionMobs.addAll(store.getStoreFilesBeingWritten().stream()
.map(path -> path.getName()).collect(Collectors.toList()));
referencedMOBs
.computeIfAbsent(hcd.getNameAsString(), cf -> new HashMap<String, List<String>>())
.computeIfAbsent(region.getRegionInfo().getEncodedName(), name -> new ArrayList<>())
.addAll(regionMobs);
} catch (FileNotFoundException e) {
LOG.warn(
"Missing file:{} Starting MOB cleaning cycle from the beginning" + " due to error",
currentPath, e);
regionMobs.clear();
continue;
} catch (IOException e) {
LOG.error("Failed to clean the obsolete mob files for table={}",
htd.getTableName().getNameAsString(), e);
}
}
}
if (LOG.isDebugEnabled()) {
LOG.debug("Found: {} active mob refs for table={}",
referencedMOBs.values().stream().map(inner -> inner.values())
.flatMap(lists -> lists.stream()).mapToInt(lists -> lists.size()).sum(),
htd.getTableName().getNameAsString());
}
if (LOG.isTraceEnabled()) {
referencedMOBs.values().stream().forEach(innerMap -> innerMap.values().stream()
.forEach(mobFileList -> mobFileList.stream().forEach(LOG::trace)));
}
// collect regions referencing MOB files belonging to the current rs
Set<String> regionsCovered = new HashSet<>();
referencedMOBs.values().stream()
.forEach(regionMap -> regionsCovered.addAll(regionMap.keySet()));
for (ColumnFamilyDescriptor hcd : list) {
List<Path> toArchive = new ArrayList<Path>();
String family = hcd.getNameAsString();
Path dir = MobUtils.getMobFamilyPath(rs.getConfiguration(), htd.getTableName(), family);
RemoteIterator<LocatedFileStatus> rit = fs.listLocatedStatus(dir);
while (rit.hasNext()) {
LocatedFileStatus lfs = rit.next();
Path p = lfs.getPath();
String[] mobParts = p.getName().split("_");
String regionName = mobParts[mobParts.length - 1];
// skip MOB files not belonging to a region assigned to the current rs
if (!regionsCovered.contains(regionName)) {
LOG.trace("MOB file does not belong to current rs: {}", p);
continue;
}
// check active or actively written mob files
Map<String, List<String>> cfMobs = referencedMOBs.get(hcd.getNameAsString());
if (
cfMobs != null && cfMobs.get(regionName) != null
&& cfMobs.get(regionName).contains(p.getName())
) {
LOG.trace("Keeping active MOB file: {}", p);
continue;
}
// MOB is not in a list of active references, but it can be too
// fresh, skip it in this case
long creationTime = fs.getFileStatus(p).getModificationTime();
if (creationTime < maxCreationTimeToArchive) {
LOG.trace("Archiving MOB file {} creation time={}", p,
(fs.getFileStatus(p).getModificationTime()));
toArchive.add(p);
} else {
LOG.trace("Skipping fresh file: {}. Creation time={}", p,
fs.getFileStatus(p).getModificationTime());
}
}
LOG.info(" MOB Cleaner found {} files to archive for table={} family={}",
toArchive.size(), htd.getTableName().getNameAsString(), family);
archiveMobFiles(rs.getConfiguration(), htd.getTableName(), family.getBytes(), toArchive);
LOG.info(" MOB Cleaner archived {} files, table={} family={}", toArchive.size(),
htd.getTableName().getNameAsString(), family);
}
LOG.info("Cleaning obsolete MOB files finished for table={}", htd.getTableName());
}
} catch (IOException e) {
LOG.error("MOB Cleaner failed when trying to access the file system", e);
}
}
/**
* Archives the mob files.
* @param conf The current configuration.
* @param tableName The table name.
* @param family The name of the column family.
* @param storeFiles The files to be archived.
* @throws IOException exception
*/
public void archiveMobFiles(Configuration conf, TableName tableName, byte[] family,
List<Path> storeFiles) throws IOException {
if (storeFiles.size() == 0) {
// nothing to remove
LOG.debug("Skipping archiving old MOB files - no files found for table={} cf={}", tableName,
Bytes.toString(family));
return;
}
Path mobTableDir = CommonFSUtils.getTableDir(MobUtils.getMobHome(conf), tableName);
FileSystem fs = storeFiles.get(0).getFileSystem(conf);
for (Path p : storeFiles) {
LOG.debug("MOB Cleaner is archiving: {}", p);
HFileArchiver.archiveStoreFile(conf, fs, MobUtils.getMobRegionInfo(tableName), mobTableDir,
family, p);
}
}
}

View File

@ -28,6 +28,7 @@ import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@ -184,7 +185,27 @@ public class HMobStore extends HStore {
}
Path path = getTempDir();
return createWriterInTmp(MobUtils.formatDate(date), path, maxKeyCount, compression, startKey,
isCompaction);
isCompaction, null);
}
/**
* Creates the writer for the mob file in the mob family directory.
* @param date The latest date of written cells.
* @param maxKeyCount The key count.
* @param compression The compression algorithm.
* @param startKey The start key.
* @param isCompaction If the writer is used in compaction.
* @return The writer for the mob file. n
*/
public StoreFileWriter createWriter(Date date, long maxKeyCount,
Compression.Algorithm compression, byte[] startKey, boolean isCompaction,
Consumer<Path> writerCreationTracker) throws IOException {
if (startKey == null) {
startKey = HConstants.EMPTY_START_ROW;
}
Path path = getPath();
return createWriterInTmp(MobUtils.formatDate(date), path, maxKeyCount, compression, startKey,
isCompaction, writerCreationTracker);
}
/**
@ -198,11 +219,13 @@ public class HMobStore extends HStore {
* @return The writer for the mob file. n
*/
public StoreFileWriter createWriterInTmp(String date, Path basePath, long maxKeyCount,
Compression.Algorithm compression, byte[] startKey, boolean isCompaction) throws IOException {
Compression.Algorithm compression, byte[] startKey, boolean isCompaction,
Consumer<Path> writerCreationTracker) throws IOException {
MobFileName mobFileName =
MobFileName.create(startKey, date, UUID.randomUUID().toString().replaceAll("-", ""),
getHRegion().getRegionInfo().getEncodedName());
return createWriterInTmp(mobFileName, basePath, maxKeyCount, compression, isCompaction);
return createWriterInTmp(mobFileName, basePath, maxKeyCount, compression, isCompaction,
writerCreationTracker);
}
/**
@ -214,13 +237,15 @@ public class HMobStore extends HStore {
* @param isCompaction If the writer is used in compaction.
* @return The writer for the mob file. n
*/
public StoreFileWriter createWriterInTmp(MobFileName mobFileName, Path basePath, long maxKeyCount,
Compression.Algorithm compression, boolean isCompaction) throws IOException {
Compression.Algorithm compression, boolean isCompaction, Consumer<Path> writerCreationTracker)
throws IOException {
return MobUtils.createWriter(conf, getFileSystem(), getColumnFamilyDescriptor(),
new Path(basePath, mobFileName.getFileName()), maxKeyCount, compression, getCacheConfig(),
getStoreContext().getEncryptionContext(), StoreUtils.getChecksumType(conf),
StoreUtils.getBytesPerChecksum(conf), getStoreContext().getBlockSize(), BloomType.NONE,
isCompaction);
isCompaction, writerCreationTracker);
}
/**
@ -234,6 +259,10 @@ public class HMobStore extends HStore {
}
Path dstPath = new Path(targetPath, sourceFile.getName());
validateMobFile(sourceFile);
if (sourceFile.equals(targetPath)) {
LOG.info("File is already in the destination dir: {}", sourceFile);
return;
}
LOG.info(" FLUSH Renaming flushed file from {} to {}", sourceFile, dstPath);
Path parent = dstPath.getParent();
if (!getFileSystem().exists(parent)) {

View File

@ -108,6 +108,7 @@ import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
import org.apache.hadoop.hbase.ipc.ServerRpcController;
import org.apache.hadoop.hbase.log.HBaseMarkers;
import org.apache.hadoop.hbase.mob.MobFileCache;
import org.apache.hadoop.hbase.mob.RSMobFileCleanerChore;
import org.apache.hadoop.hbase.monitoring.TaskMonitor;
import org.apache.hadoop.hbase.namequeues.NamedQueueRecorder;
import org.apache.hadoop.hbase.namequeues.SlowLogTableOpsChore;
@ -438,6 +439,8 @@ public class HRegionServer extends HBaseServerBase<RSRpcServices>
private BrokenStoreFileCleaner brokenStoreFileCleaner;
private RSMobFileCleanerChore rsMobFileCleanerChore;
@InterfaceAudience.Private
CompactedHFilesDischarger compactedFileDischarger;
@ -1898,6 +1901,10 @@ public class HRegionServer extends HBaseServerBase<RSRpcServices>
choreService.scheduleChore(brokenStoreFileCleaner);
}
if (this.rsMobFileCleanerChore != null) {
choreService.scheduleChore(rsMobFileCleanerChore);
}
// Leases is not a Thread. Internally it runs a daemon thread. If it gets
// an unhandled exception, it will just exit.
Threads.setDaemonThreadRunning(this.leaseManager, getName() + ".leaseChecker",
@ -1994,6 +2001,8 @@ public class HRegionServer extends HBaseServerBase<RSRpcServices>
new BrokenStoreFileCleaner((int) (brokenStoreFileCleanerDelay + jitterValue),
brokenStoreFileCleanerPeriod, this, conf, this);
this.rsMobFileCleanerChore = new RSMobFileCleanerChore(this);
registerConfigurationObservers();
}
@ -3550,6 +3559,11 @@ public class HRegionServer extends HBaseServerBase<RSRpcServices>
return brokenStoreFileCleaner;
}
@InterfaceAudience.Private
public RSMobFileCleanerChore getRSMobFileCleanerChore() {
return rsMobFileCleanerChore;
}
RSSnapshotVerifier getRsSnapshotVerifier() {
return rsSnapshotVerifier;
}
@ -3566,6 +3580,7 @@ public class HRegionServer extends HBaseServerBase<RSRpcServices>
shutdownChore(fsUtilizationChore);
shutdownChore(slowLogTableOpsChore);
shutdownChore(brokenStoreFileCleaner);
shutdownChore(rsMobFileCleanerChore);
}
@Override

View File

@ -2432,7 +2432,7 @@ public class HStore
* {@link BrokenStoreFileCleaner} to prevent deleting the these files as they are not present in
* SFT yet.
*/
Set<Path> getStoreFilesBeingWritten() {
public Set<Path> getStoreFilesBeingWritten() {
return storeFileWriterCreationTrackers.stream().flatMap(t -> t.get().stream())
.collect(Collectors.toSet());
}

View File

@ -362,7 +362,7 @@ public abstract class Compactor<T extends CellSink> {
writer = sinkFactory.createWriter(scanner, fd, dropCache, request.isMajor(),
request.getWriterCreationTracker());
finished = performCompaction(fd, scanner, writer, smallestReadPoint, cleanSeqId,
throughputController, request.isAllFiles(), request.getFiles().size(), progress);
throughputController, request, progress);
if (!finished) {
throw new InterruptedIOException("Aborting compaction of store " + store + " in region "
+ store.getRegionInfo().getRegionNameAsString() + " because it was interrupted.");
@ -407,14 +407,13 @@ public abstract class Compactor<T extends CellSink> {
* @param smallestReadPoint Smallest read point.
* @param cleanSeqId When true, remove seqId(used to be mvcc) value which is &lt;=
* smallestReadPoint
* @param major Is a major compaction.
* @param numofFilesToCompact the number of files to compact
* @param request compaction request.
* @param progress Progress reporter.
* @return Whether compaction ended; false if it was interrupted for some reason.
*/
protected boolean performCompaction(FileDetails fd, InternalScanner scanner, CellSink writer,
long smallestReadPoint, boolean cleanSeqId, ThroughputController throughputController,
boolean major, int numofFilesToCompact, CompactionProgress progress) throws IOException {
CompactionRequestImpl request, CompactionProgress progress) throws IOException {
assert writer instanceof ShipperListener;
long bytesWrittenProgressForLog = 0;
long bytesWrittenProgressForShippedCall = 0;
@ -436,7 +435,7 @@ public abstract class Compactor<T extends CellSink> {
throughputController.start(compactionName);
KeyValueScanner kvs = (scanner instanceof KeyValueScanner) ? (KeyValueScanner) scanner : null;
long shippedCallSizeLimit =
(long) numofFilesToCompact * this.store.getColumnFamilyDescriptor().getBlocksize();
(long) request.getFiles().size() * this.store.getColumnFamilyDescriptor().getBlocksize();
try {
do {
hasMore = scanner.next(cells, scannerContext);

View File

@ -44,6 +44,7 @@ import org.apache.hadoop.hbase.regionserver.ShipperListener;
import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
import org.apache.hadoop.hbase.regionserver.compactions.CloseChecker;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequestImpl;
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputControlUtil;
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
import org.apache.hadoop.hbase.util.Bytes;
@ -92,8 +93,9 @@ public class FaultyMobStoreCompactor extends DefaultMobStoreCompactor {
@Override
protected boolean performCompaction(FileDetails fd, InternalScanner scanner, CellSink writer,
long smallestReadPoint, boolean cleanSeqId, ThroughputController throughputController,
boolean major, int numofFilesToCompact, CompactionProgress progress) throws IOException {
CompactionRequestImpl request, CompactionProgress progress) throws IOException {
boolean major = request.isAllFiles();
totalCompactions.incrementAndGet();
if (major) {
totalMajorCompactions.incrementAndGet();
@ -145,7 +147,7 @@ public class FaultyMobStoreCompactor extends DefaultMobStoreCompactor {
throughputController.start(compactionName);
KeyValueScanner kvs = (scanner instanceof KeyValueScanner) ? (KeyValueScanner) scanner : null;
long shippedCallSizeLimit =
(long) numofFilesToCompact * this.store.getColumnFamilyDescriptor().getBlocksize();
(long) request.getFiles().size() * this.store.getColumnFamilyDescriptor().getBlocksize();
Cell mobCell = null;

View File

@ -18,17 +18,26 @@
package org.apache.hadoop.hbase.mob;
import java.io.IOException;
import java.util.Date;
import java.util.List;
import java.util.Random;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseTestingUtil;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.hadoop.hbase.io.crypto.Encryption;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
@ -112,4 +121,17 @@ public class MobTestUtil {
scan.setAttribute(MobConstants.MOB_SCAN_RAW, Bytes.toBytes(Boolean.TRUE));
return HBaseTestingUtil.countRows(table, scan);
}
public static Path generateMOBFileForRegion(Configuration conf, TableName tableName,
ColumnFamilyDescriptor familyDescriptor, String regionName) throws IOException {
Date date = new Date();
String dateStr = MobUtils.formatDate(date);
FileSystem fs = FileSystem.get(conf);
Path cfMOBDir = MobUtils.getMobFamilyPath(conf, tableName, familyDescriptor.getNameAsString());
StoreFileWriter writer = MobUtils.createWriter(conf, fs, familyDescriptor, dateStr, cfMOBDir,
1000L, Compression.Algorithm.NONE, "startKey", CacheConfig.DISABLED, Encryption.Context.NONE,
false, "");
writer.close();
return writer.getPath();
}
}

View File

@ -17,6 +17,8 @@
*/
package org.apache.hadoop.hbase.mob;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
@ -31,17 +33,21 @@ import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.AfterClass;
import org.junit.After;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@RunWith(Parameterized.class)
@Category(LargeTests.class)
public class TestDefaultMobStoreFlusher {
@ -61,19 +67,35 @@ public class TestDefaultMobStoreFlusher {
@Rule
public TestName name = new TestName();
@BeforeClass
public static void setUpBeforeClass() throws Exception {
protected Boolean useFileBasedSFT;
public TestDefaultMobStoreFlusher(Boolean useFileBasedSFT) {
this.useFileBasedSFT = useFileBasedSFT;
}
@Parameterized.Parameters
public static Collection<Boolean> data() {
Boolean[] data = { false, true };
return Arrays.asList(data);
}
@Before
public void setUpBefore() throws Exception {
if (useFileBasedSFT) {
TEST_UTIL.getConfiguration().set(StoreFileTrackerFactory.TRACKER_IMPL,
"org.apache.hadoop.hbase.regionserver.storefiletracker.FileBasedStoreFileTracker");
}
TEST_UTIL.startMiniCluster(1);
}
@AfterClass
public static void tearDownAfterClass() throws Exception {
@After
public void tearDownAfter() throws Exception {
TEST_UTIL.shutdownMiniCluster();
}
@Test
public void testFlushNonMobFile() throws Exception {
final TableName tableName = TableName.valueOf(name.getMethodName());
final TableName tableName = TableName.valueOf(TestMobUtils.getTableName(name));
TableDescriptor tableDescriptor = TableDescriptorBuilder.newBuilder(tableName)
.setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(4).build())
.build();
@ -82,7 +104,7 @@ public class TestDefaultMobStoreFlusher {
@Test
public void testFlushMobFile() throws Exception {
final TableName tableName = TableName.valueOf(name.getMethodName());
final TableName tableName = TableName.valueOf(TestMobUtils.getTableName(name));
TableDescriptor tableDescriptor = TableDescriptorBuilder.newBuilder(tableName)
.setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(family).setMobEnabled(true)
.setMobThreshold(3L).setMaxVersions(4).build())

View File

@ -17,10 +17,8 @@
*/
package org.apache.hadoop.hbase.mob;
import java.io.IOException;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.experimental.categories.Category;
@ -41,12 +39,13 @@ public class TestMobCompactionOptMode extends TestMobCompactionWithDefaults {
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestMobCompactionOptMode.class);
@BeforeClass
public static void configureOptimizedCompaction() throws InterruptedException, IOException {
HTU.shutdownMiniHBaseCluster();
public TestMobCompactionOptMode(Boolean useFileBasedSFT) {
super(useFileBasedSFT);
}
protected void additonalConfigSetup() {
conf.set(MobConstants.MOB_COMPACTION_TYPE_KEY, MobConstants.OPTIMIZED_MOB_COMPACTION_TYPE);
conf.setLong(MobConstants.MOB_COMPACTION_MAX_FILE_SIZE_KEY, 1000000);
HTU.startMiniHBaseCluster();
}
@Override

View File

@ -23,9 +23,10 @@ import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -39,6 +40,7 @@ import org.slf4j.LoggerFactory;
* time larger than minimum age to archive 10. Runs Mob cleaner chore 11 Verifies that number of MOB
* files in a mob directory is 20. 12 Runs scanner and checks all 3 * 1000 rows.
*/
@RunWith(Parameterized.class)
@Category(LargeTests.class)
public class TestMobCompactionOptRegionBatchMode extends TestMobCompactionWithDefaults {
private static final Logger LOG =
@ -50,20 +52,20 @@ public class TestMobCompactionOptRegionBatchMode extends TestMobCompactionWithDe
private static final int batchSize = 7;
private MobFileCompactionChore compactionChore;
public TestMobCompactionOptRegionBatchMode(Boolean useFileBasedSFT) {
super(useFileBasedSFT);
}
@Before
public void setUp() throws Exception {
super.setUp();
compactionChore = new MobFileCompactionChore(conf, batchSize);
}
@BeforeClass
public static void configureOptimizedCompactionAndBatches()
throws InterruptedException, IOException {
HTU.shutdownMiniHBaseCluster();
protected void additonalConfigSetup() {
conf.setInt(MobConstants.MOB_MAJOR_COMPACTION_REGION_BATCH_SIZE, batchSize);
conf.set(MobConstants.MOB_COMPACTION_TYPE_KEY, MobConstants.OPTIMIZED_MOB_COMPACTION_TYPE);
conf.setLong(MobConstants.MOB_COMPACTION_MAX_FILE_SIZE_KEY, 1000000);
HTU.startMiniHBaseCluster();
}
@Override

View File

@ -23,9 +23,10 @@ import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -39,6 +40,7 @@ import org.slf4j.LoggerFactory;
* to archive 10. Runs Mob cleaner chore 11 Verifies that number of MOB files in a mob directory is
* 20. 12 Runs scanner and checks all 3 * 1000 rows.
*/
@RunWith(Parameterized.class)
@Category(LargeTests.class)
public class TestMobCompactionRegularRegionBatchMode extends TestMobCompactionWithDefaults {
private static final Logger LOG =
@ -50,17 +52,18 @@ public class TestMobCompactionRegularRegionBatchMode extends TestMobCompactionWi
private static final int batchSize = 7;
private MobFileCompactionChore compactionChore;
public TestMobCompactionRegularRegionBatchMode(Boolean useFileBasedSFT) {
super(useFileBasedSFT);
}
@Before
public void setUp() throws Exception {
super.setUp();
compactionChore = new MobFileCompactionChore(conf, batchSize);
}
@BeforeClass
public static void configureCompactionBatches() throws InterruptedException, IOException {
HTU.shutdownMiniHBaseCluster();
protected void additonalConfigSetup() {
conf.setInt(MobConstants.MOB_MAJOR_COMPACTION_REGION_BATCH_SIZE, batchSize);
HTU.startMiniHBaseCluster();
}
@Override

View File

@ -23,6 +23,7 @@ import static org.junit.Assert.fail;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
@ -31,6 +32,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtil;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
@ -41,18 +43,19 @@ import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.RegionSplitter;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -66,6 +69,7 @@ import org.slf4j.LoggerFactory;
* Runs Mob cleaner chore 11 Verifies that number of MOB files in a mob directory is 20. 12 Runs
* scanner and checks all 3 * 1000 rows.
*/
@RunWith(Parameterized.class)
@Category(LargeTests.class)
public class TestMobCompactionWithDefaults {
private static final Logger LOG = LoggerFactory.getLogger(TestMobCompactionWithDefaults.class);
@ -73,7 +77,7 @@ public class TestMobCompactionWithDefaults {
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestMobCompactionWithDefaults.class);
protected static HBaseTestingUtil HTU;
protected HBaseTestingUtil HTU;
protected static Configuration conf;
protected static long minAgeToArchive = 10000;
@ -95,8 +99,19 @@ public class TestMobCompactionWithDefaults {
protected MobFileCleanerChore cleanerChore;
@BeforeClass
public static void htuStart() throws Exception {
protected Boolean useFileBasedSFT;
public TestMobCompactionWithDefaults(Boolean useFileBasedSFT) {
this.useFileBasedSFT = useFileBasedSFT;
}
@Parameterized.Parameters
public static Collection<Boolean> data() {
Boolean[] data = { false, true };
return Arrays.asList(data);
}
protected void htuStart() throws Exception {
HTU = new HBaseTestingUtil();
conf = HTU.getConfiguration();
conf.setInt("hfile.format.version", 3);
@ -109,21 +124,25 @@ public class TestMobCompactionWithDefaults {
// Set compacted file discharger interval to a half minAgeToArchive
conf.setLong("hbase.hfile.compaction.discharger.interval", minAgeToArchive / 2);
conf.setBoolean("hbase.regionserver.compaction.enabled", false);
if (useFileBasedSFT) {
conf.set(StoreFileTrackerFactory.TRACKER_IMPL,
"org.apache.hadoop.hbase.regionserver.storefiletracker.FileBasedStoreFileTracker");
}
additonalConfigSetup();
HTU.startMiniCluster();
}
@AfterClass
public static void htuStop() throws Exception {
HTU.shutdownMiniCluster();
protected void additonalConfigSetup() {
}
@Before
public void setUp() throws Exception {
htuStart();
admin = HTU.getAdmin();
cleanerChore = new MobFileCleanerChore();
familyDescriptor = ColumnFamilyDescriptorBuilder.newBuilder(fam).setMobEnabled(true)
.setMobThreshold(mobLen).setMaxVersions(1).build();
tableDescriptor = HTU.createModifyableTableDescriptor(test.getMethodName())
tableDescriptor = HTU.createModifyableTableDescriptor(TestMobUtils.getTableName(test))
.setColumnFamily(familyDescriptor).build();
RegionSplitter.UniformSplit splitAlgo = new RegionSplitter.UniformSplit();
byte[][] splitKeys = splitAlgo.split(numRegions);
@ -152,6 +171,7 @@ public class TestMobCompactionWithDefaults {
public void tearDown() throws Exception {
admin.disableTable(tableDescriptor.getTableName());
admin.deleteTable(tableDescriptor.getTableName());
HTU.shutdownMiniCluster();
}
@Test
@ -167,12 +187,12 @@ public class TestMobCompactionWithDefaults {
@Test
public void testMobFileCompactionAfterSnapshotClone() throws InterruptedException, IOException {
final TableName clone = TableName.valueOf(test.getMethodName() + "-clone");
final TableName clone = TableName.valueOf(TestMobUtils.getTableName(test) + "-clone");
LOG.info("MOB compaction of cloned snapshot, " + description() + " started");
loadAndFlushThreeTimes(rows, table, famStr);
LOG.debug("Taking snapshot and cloning table {}", table);
admin.snapshot(test.getMethodName(), table);
admin.cloneSnapshot(test.getMethodName(), clone);
admin.snapshot(TestMobUtils.getTableName(test), table);
admin.cloneSnapshot(TestMobUtils.getTableName(test), clone);
assertEquals("Should have 3 hlinks per region in MOB area from snapshot clone", 3 * numRegions,
getNumberOfMobFiles(clone, famStr));
mobCompact(admin.getDescriptor(clone), familyDescriptor);
@ -185,12 +205,12 @@ public class TestMobCompactionWithDefaults {
@Test
public void testMobFileCompactionAfterSnapshotCloneAndFlush()
throws InterruptedException, IOException {
final TableName clone = TableName.valueOf(test.getMethodName() + "-clone");
final TableName clone = TableName.valueOf(TestMobUtils.getTableName(test) + "-clone");
LOG.info("MOB compaction of cloned snapshot after flush, " + description() + " started");
loadAndFlushThreeTimes(rows, table, famStr);
LOG.debug("Taking snapshot and cloning table {}", table);
admin.snapshot(test.getMethodName(), table);
admin.cloneSnapshot(test.getMethodName(), clone);
admin.snapshot(TestMobUtils.getTableName(test), table);
admin.cloneSnapshot(TestMobUtils.getTableName(test), clone);
assertEquals("Should have 3 hlinks per region in MOB area from snapshot clone", 3 * numRegions,
getNumberOfMobFiles(clone, famStr));
loadAndFlushThreeTimes(rows, clone, famStr);
@ -269,8 +289,11 @@ public class TestMobCompactionWithDefaults {
Thread.sleep(minAgeToArchive + 1000);
LOG.info("Cleaning up MOB files");
// Cleanup again
cleanerChore.cleanupObsoleteMobFiles(conf, table);
// run cleaner chore on each RS
for (ServerName sn : admin.getRegionServers()) {
HTU.getMiniHBaseCluster().getRegionServer(sn).getRSMobFileCleanerChore().chore();
}
assertEquals("After cleaning, we should have 1 MOB file per region based on size.", numRegions,
getNumberOfMobFiles(table, family));

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.hbase.mob;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
@ -166,14 +167,38 @@ public class TestMobFileCleanerChore {
Thread.sleep(minAgeToArchive + 1000);
LOG.info("Cleaning up MOB files");
// Cleanup again
// Cleanup
chore.cleanupObsoleteMobFiles(conf, table.getName());
// verify that nothing have happened
num = getNumberOfMobFiles(conf, table.getName(), new String(fam));
assertEquals(1, num);
assertEquals(4, num);
long scanned = scanTable();
assertEquals(30, scanned);
// add a MOB file to with a name refering to a non-existing region
Path extraMOBFile = MobTestUtil.generateMOBFileForRegion(conf, table.getName(),
familyDescriptor, "nonExistentRegion");
num = getNumberOfMobFiles(conf, table.getName(), new String(fam));
assertEquals(5, num);
LOG.info("Waiting for {}ms", minAgeToArchive + 1000);
Thread.sleep(minAgeToArchive + 1000);
LOG.info("Cleaning up MOB files");
chore.cleanupObsoleteMobFiles(conf, table.getName());
// check that the extra file got deleted
num = getNumberOfMobFiles(conf, table.getName(), new String(fam));
assertEquals(4, num);
FileSystem fs = FileSystem.get(conf);
assertFalse(fs.exists(extraMOBFile));
scanned = scanTable();
assertEquals(30, scanned);
}
private long getNumberOfMobFiles(Configuration conf, TableName tableName, String family)

View File

@ -26,11 +26,14 @@ import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
@ -63,6 +66,7 @@ import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.regionserver.RegionAsTable;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory;
import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.testclassification.MediumTests;
@ -76,12 +80,15 @@ import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Test mob store compaction
*/
@RunWith(Parameterized.class)
@Category(MediumTests.class)
public class TestMobStoreCompaction {
@ -106,15 +113,33 @@ public class TestMobStoreCompaction {
private final byte[] STARTROW = Bytes.toBytes(START_KEY);
private int compactionThreshold;
private Boolean useFileBasedSFT;
public TestMobStoreCompaction(Boolean useFileBasedSFT) {
this.useFileBasedSFT = useFileBasedSFT;
}
@Parameterized.Parameters
public static Collection<Boolean> data() {
Boolean[] data = { false, true };
return Arrays.asList(data);
}
private void init(Configuration conf, long mobThreshold) throws Exception {
if (useFileBasedSFT) {
conf.set(StoreFileTrackerFactory.TRACKER_IMPL,
"org.apache.hadoop.hbase.regionserver.storefiletracker.FileBasedStoreFileTracker");
}
this.conf = conf;
this.mobCellThreshold = mobThreshold;
HBaseTestingUtil UTIL = new HBaseTestingUtil(conf);
compactionThreshold = conf.getInt("hbase.hstore.compactionThreshold", 3);
familyDescriptor = ColumnFamilyDescriptorBuilder.newBuilder(COLUMN_FAMILY).setMobEnabled(true)
.setMobThreshold(mobThreshold).setMaxVersions(1).build();
tableDescriptor = UTIL.createModifyableTableDescriptor(name.getMethodName())
tableDescriptor = UTIL.createModifyableTableDescriptor(TestMobUtils.getTableName(name))
.modifyColumnFamily(familyDescriptor).build();
RegionInfo regionInfo = RegionInfoBuilder.newBuilder(tableDescriptor.getTableName()).build();
@ -223,7 +248,7 @@ public class TestMobStoreCompaction {
Path basedir = new Path(hbaseRootDir, tableDescriptor.getTableName().getNameAsString());
List<Pair<byte[], String>> hfiles = new ArrayList<>(1);
for (int i = 0; i < compactionThreshold; i++) {
Path hpath = new Path(basedir, "hfile" + i);
Path hpath = new Path(basedir, UUID.randomUUID().toString().replace("-", ""));
hfiles.add(Pair.newPair(COLUMN_FAMILY, hpath.toString()));
createHFile(hpath, i, dummyData);
}

View File

@ -27,6 +27,7 @@ import org.apache.hadoop.hbase.util.Bytes;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableSet;
import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableSetMultimap;
@ -89,4 +90,8 @@ public class TestMobUtils {
assertTrue(testTable3Refs.contains("file3a"));
assertTrue(testTable3Refs.contains("file3b"));
}
public static String getTableName(TestName test) {
return test.getMethodName().replace("[", "-").replace("]", "");
}
}

View File

@ -0,0 +1,263 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.mob;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtil;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.CompactionState;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.master.cleaner.TimeToLiveHFileCleaner;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.After;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Mob file cleaner chore test. 1. Creates MOB table 2. Load MOB data and flushes it N times 3. Runs
* major MOB compaction 4. Verifies that number of MOB files in a mob directory is N+1 5. Waits for
* a period of time larger than minimum age to archive 6. Runs Mob cleaner chore 7 Verifies that
* every old MOB file referenced from current RS was archived
*/
@Category(MediumTests.class)
public class TestRSMobFileCleanerChore {
private static final Logger LOG = LoggerFactory.getLogger(TestRSMobFileCleanerChore.class);
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestRSMobFileCleanerChore.class);
private HBaseTestingUtil HTU;
private final static String famStr = "f1";
private final static byte[] fam = Bytes.toBytes(famStr);
private final static byte[] qualifier = Bytes.toBytes("q1");
private final static long mobLen = 10;
private final static byte[] mobVal = Bytes
.toBytes("01234567890123456789012345678901234567890123456789012345678901234567890123456789");
private Configuration conf;
private TableDescriptor tableDescriptor;
private ColumnFamilyDescriptor familyDescriptor;
private Admin admin;
private Table table = null;
private RSMobFileCleanerChore chore;
private long minAgeToArchive = 10000;
public TestRSMobFileCleanerChore() {
}
@Before
public void setUp() throws Exception {
HTU = new HBaseTestingUtil();
conf = HTU.getConfiguration();
initConf();
HTU.startMiniCluster();
admin = HTU.getAdmin();
familyDescriptor = ColumnFamilyDescriptorBuilder.newBuilder(fam).setMobEnabled(true)
.setMobThreshold(mobLen).setMaxVersions(1).build();
tableDescriptor = HTU.createModifyableTableDescriptor("testMobCompactTable")
.setColumnFamily(familyDescriptor).build();
table = HTU.createTable(tableDescriptor, Bytes.toByteArrays("1"));
}
private void initConf() {
conf.setInt("hfile.format.version", 3);
conf.setLong(TimeToLiveHFileCleaner.TTL_CONF_KEY, 0);
conf.setInt("hbase.client.retries.number", 100);
conf.setInt("hbase.hregion.max.filesize", 200000000);
conf.setInt("hbase.hregion.memstore.flush.size", 800000);
conf.setInt("hbase.hstore.blockingStoreFiles", 150);
conf.setInt("hbase.hstore.compaction.throughput.lower.bound", 52428800);
conf.setInt("hbase.hstore.compaction.throughput.higher.bound", 2 * 52428800);
// conf.set(MobStoreEngine.DEFAULT_MOB_COMPACTOR_CLASS_KEY,
// FaultyMobStoreCompactor.class.getName());
// Disable automatic MOB compaction
conf.setLong(MobConstants.MOB_COMPACTION_CHORE_PERIOD, 0);
// Disable automatic MOB file cleaner chore
conf.setLong(MobConstants.MOB_CLEANER_PERIOD, 0);
// Set minimum age to archive to 10 sec
conf.setLong(MobConstants.MIN_AGE_TO_ARCHIVE_KEY, minAgeToArchive);
// Set compacted file discharger interval to a half minAgeToArchive
conf.setLong("hbase.hfile.compaction.discharger.interval", minAgeToArchive / 2);
}
private void loadData(int start, int num) {
try {
for (int i = 0; i < num; i++) {
Put p = new Put(Bytes.toBytes(start + i));
p.addColumn(fam, qualifier, mobVal);
table.put(p);
}
admin.flush(table.getName());
} catch (Exception e) {
LOG.error("MOB file cleaner chore test FAILED", e);
assertTrue(false);
}
}
@After
public void tearDown() throws Exception {
admin.disableTable(tableDescriptor.getTableName());
admin.deleteTable(tableDescriptor.getTableName());
HTU.shutdownMiniCluster();
}
@Test
public void testMobFileCleanerChore() throws InterruptedException, IOException {
loadData(0, 10);
loadData(10, 10);
// loadData(20, 10);
long num = getNumberOfMobFiles(conf, table.getName(), new String(fam));
assertEquals(2, num);
// Major compact
admin.majorCompact(tableDescriptor.getTableName(), fam);
// wait until compaction is complete
while (admin.getCompactionState(tableDescriptor.getTableName()) != CompactionState.NONE) {
Thread.sleep(100);
}
num = getNumberOfMobFiles(conf, table.getName(), new String(fam));
assertEquals(3, num);
// We have guarantee, that compcated file discharger will run during this pause
// because it has interval less than this wait time
LOG.info("Waiting for {}ms", minAgeToArchive + 1000);
Thread.sleep(minAgeToArchive + 1000);
LOG.info("Cleaning up MOB files");
ServerName serverUsed = null;
List<RegionInfo> serverRegions = null;
for (ServerName sn : admin.getRegionServers()) {
serverRegions = admin.getRegions(sn);
if (serverRegions != null && serverRegions.size() > 0) {
// filtering out non test table regions
serverRegions = serverRegions.stream().filter(r -> r.getTable() == table.getName())
.collect(Collectors.toList());
// if such one is found use this rs
if (serverRegions.size() > 0) {
serverUsed = sn;
}
break;
}
}
chore = HTU.getMiniHBaseCluster().getRegionServer(serverUsed).getRSMobFileCleanerChore();
chore.chore();
num = getNumberOfMobFiles(conf, table.getName(), new String(fam));
assertEquals(3 - serverRegions.size(), num);
long scanned = scanTable();
assertEquals(20, scanned);
// creating a MOB file not referenced from the current RS
Path extraMOBFile = MobTestUtil.generateMOBFileForRegion(conf, table.getName(),
familyDescriptor, "nonExistentRegion");
// verifying the new MOBfile is added
num = getNumberOfMobFiles(conf, table.getName(), new String(fam));
assertEquals(4 - serverRegions.size(), num);
FileSystem fs = FileSystem.get(conf);
assertTrue(fs.exists(extraMOBFile));
LOG.info("Waiting for {}ms", minAgeToArchive + 1000);
Thread.sleep(minAgeToArchive + 1000);
LOG.info("Cleaning up MOB files");
// running chore again
chore.chore();
// the chore should only archive old MOB files that were referenced from the current RS
// the unrelated MOB file is still there
num = getNumberOfMobFiles(conf, table.getName(), new String(fam));
assertEquals(4 - serverRegions.size(), num);
assertTrue(fs.exists(extraMOBFile));
scanned = scanTable();
assertEquals(20, scanned);
}
private long getNumberOfMobFiles(Configuration conf, TableName tableName, String family)
throws IOException {
FileSystem fs = FileSystem.get(conf);
Path dir = MobUtils.getMobFamilyPath(conf, tableName, family);
FileStatus[] stat = fs.listStatus(dir);
for (FileStatus st : stat) {
LOG.debug("DDDD MOB Directory content: {} size={}", st.getPath(), st.getLen());
}
LOG.debug("MOB Directory content total files: {}", stat.length);
return stat.length;
}
private long scanTable() {
try {
Result result;
ResultScanner scanner = table.getScanner(fam);
long counter = 0;
while ((result = scanner.next()) != null) {
assertTrue(Arrays.equals(result.getValue(fam, qualifier), mobVal));
counter++;
}
return counter;
} catch (Exception e) {
e.printStackTrace();
LOG.error("MOB file cleaner chore test FAILED");
if (HTU != null) {
assertTrue(false);
} else {
System.exit(-1);
}
}
return 0;
}
}