HBASE-26969:Eliminate MOB renames when SFT is enabled

Signed-off-by: Wellington Chevreuil <wchevreuil@apache.org>
Signed-off-by: Peter Somogyi <psomogyi@apache.org>

closes #4712
This commit is contained in:
BukrosSzabolcs 2022-06-21 11:18:55 +02:00 committed by Peter Somogyi
parent e239f70f2d
commit 9f3c52265d
22 changed files with 875 additions and 400 deletions

View File

@ -294,20 +294,20 @@ public class DefaultMobStoreCompactor extends DefaultCompactor {
* @param cleanSeqId When true, remove seqId(used to be mvcc) value which is <=
* smallestReadPoint
* @param throughputController The compaction throughput controller.
* @param major Is a major compaction.
* @param numofFilesToCompact the number of files to compact
* @param request compaction request.
* @param progress Progress reporter.
* @return Whether compaction ended; false if it was interrupted for any reason.
*/
@Override
protected boolean performCompaction(FileDetails fd, InternalScanner scanner, CellSink writer,
long smallestReadPoint, boolean cleanSeqId, ThroughputController throughputController,
boolean major, int numofFilesToCompact, CompactionProgress progress) throws IOException {
CompactionRequestImpl request, CompactionProgress progress) throws IOException {
long bytesWrittenProgressForLog = 0;
long bytesWrittenProgressForShippedCall = 0;
// Clear old mob references
mobRefSet.get().clear();
boolean isUserRequest = userRequest.get();
boolean major = request.isAllFiles();
boolean compactMOBs = major && isUserRequest;
boolean discardMobMiss = conf.getBoolean(MobConstants.MOB_UNSAFE_DISCARD_MISS_KEY,
MobConstants.DEFAULT_MOB_DISCARD_MISS);
@ -351,12 +351,12 @@ public class DefaultMobStoreCompactor extends DefaultCompactor {
throughputController.start(compactionName);
KeyValueScanner kvs = (scanner instanceof KeyValueScanner) ? (KeyValueScanner) scanner : null;
long shippedCallSizeLimit =
(long) numofFilesToCompact * this.store.getColumnFamilyDescriptor().getBlocksize();
(long) request.getFiles().size() * this.store.getColumnFamilyDescriptor().getBlocksize();
Cell mobCell = null;
try {
mobFileWriter = newMobWriter(fd, major);
mobFileWriter = newMobWriter(fd, major, request.getWriterCreationTracker());
fileName = Bytes.toBytes(mobFileWriter.getPath().getName());
do {
@ -426,7 +426,7 @@ public class DefaultMobStoreCompactor extends DefaultCompactor {
LOG.debug("Closing output MOB File, length={} file={}, store={}", len,
mobFileWriter.getPath().getName(), getStoreInfo());
commitOrAbortMobWriter(mobFileWriter, fd.maxSeqId, mobCells, major);
mobFileWriter = newMobWriter(fd, major);
mobFileWriter = newMobWriter(fd, major, request.getWriterCreationTracker());
fileName = Bytes.toBytes(mobFileWriter.getPath().getName());
mobCells = 0;
}
@ -470,7 +470,7 @@ public class DefaultMobStoreCompactor extends DefaultCompactor {
long len = mobFileWriter.getPos();
if (len > maxMobFileSize) {
commitOrAbortMobWriter(mobFileWriter, fd.maxSeqId, mobCells, major);
mobFileWriter = newMobWriter(fd, major);
mobFileWriter = newMobWriter(fd, major, request.getWriterCreationTracker());
fileName = Bytes.toBytes(mobFileWriter.getPath().getName());
mobCells = 0;
}
@ -522,7 +522,7 @@ public class DefaultMobStoreCompactor extends DefaultCompactor {
long len = mobFileWriter.getPos();
if (len > maxMobFileSize) {
commitOrAbortMobWriter(mobFileWriter, fd.maxSeqId, mobCells, major);
mobFileWriter = newMobWriter(fd, major);
mobFileWriter = newMobWriter(fd, major, request.getWriterCreationTracker());
fileName = Bytes.toBytes(mobFileWriter.getPath().getName());
mobCells = 0;
}
@ -608,11 +608,16 @@ public class DefaultMobStoreCompactor extends DefaultCompactor {
}
}
private StoreFileWriter newMobWriter(FileDetails fd, boolean major) throws IOException {
private StoreFileWriter newMobWriter(FileDetails fd, boolean major,
Consumer<Path> writerCreationTracker) throws IOException {
try {
StoreFileWriter mobFileWriter = mobStore.createWriterInTmp(new Date(fd.latestPutTs),
fd.maxKeyCount, major ? majorCompactionCompression : minorCompactionCompression,
store.getRegionInfo().getStartKey(), true);
StoreFileWriter mobFileWriter = mobStore.getStoreEngine().requireWritingToTmpDirFirst()
? mobStore.createWriterInTmp(new Date(fd.latestPutTs), fd.maxKeyCount,
major ? majorCompactionCompression : minorCompactionCompression,
store.getRegionInfo().getStartKey(), true)
: mobStore.createWriter(new Date(fd.latestPutTs), fd.maxKeyCount,
major ? majorCompactionCompression : minorCompactionCompression,
store.getRegionInfo().getStartKey(), true, writerCreationTracker);
LOG.debug("New MOB writer created={} store={}", mobFileWriter.getPath().getName(),
getStoreInfo());
// Add reference we get for compact MOB

View File

@ -127,7 +127,8 @@ public class DefaultMobStoreFlusher extends DefaultStoreFlusher {
try {
// It's a mob store, flush the cells in a mob way. This is the difference of flushing
// between a normal and a mob store.
performMobFlush(snapshot, cacheFlushId, scanner, writer, status, throughputController);
performMobFlush(snapshot, cacheFlushId, scanner, writer, status, throughputController,
writerCreationTracker);
} catch (IOException ioe) {
e = ioe;
// throw the exception out
@ -171,16 +172,21 @@ public class DefaultMobStoreFlusher extends DefaultStoreFlusher {
*/
protected void performMobFlush(MemStoreSnapshot snapshot, long cacheFlushId,
InternalScanner scanner, StoreFileWriter writer, MonitoredTask status,
ThroughputController throughputController) throws IOException {
ThroughputController throughputController, Consumer<Path> writerCreationTracker)
throws IOException {
StoreFileWriter mobFileWriter = null;
int compactionKVMax =
conf.getInt(HConstants.COMPACTION_KV_MAX, HConstants.COMPACTION_KV_MAX_DEFAULT);
long mobCount = 0;
long mobSize = 0;
long time = snapshot.getTimeRangeTracker().getMax();
mobFileWriter = mobStore.createWriterInTmp(new Date(time), snapshot.getCellsCount(),
store.getColumnFamilyDescriptor().getCompressionType(), store.getRegionInfo().getStartKey(),
false);
mobFileWriter = mobStore.getStoreEngine().requireWritingToTmpDirFirst()
? mobStore.createWriterInTmp(new Date(time), snapshot.getCellsCount(),
store.getColumnFamilyDescriptor().getCompressionType(), store.getRegionInfo().getStartKey(),
false)
: mobStore.createWriter(new Date(time), snapshot.getCellsCount(),
store.getColumnFamilyDescriptor().getCompressionType(), store.getRegionInfo().getStartKey(),
false, writerCreationTracker);
// the target path is {tableName}/.mob/{cfName}/mobFiles
// the relative path is mobFiles
byte[] fileName = Bytes.toBytes(mobFileWriter.getPath().getName());

View File

@ -168,14 +168,15 @@ public class MobFileCleanerChore extends ScheduledChore {
maxCreationTimeToArchive, table);
}
FileSystem fs = FileSystem.get(conf);
Set<String> regionNames = new HashSet<>();
Path rootDir = CommonFSUtils.getRootDir(conf);
Path tableDir = CommonFSUtils.getTableDir(rootDir, table);
// How safe is this call?
List<Path> regionDirs = FSUtils.getRegionDirs(FileSystem.get(conf), tableDir);
List<Path> regionDirs = FSUtils.getRegionDirs(fs, tableDir);
Set<String> allActiveMobFileName = new HashSet<String>();
FileSystem fs = FileSystem.get(conf);
for (Path regionPath : regionDirs) {
regionNames.add(regionPath.getName());
for (ColumnFamilyDescriptor hcd : list) {
String family = hcd.getNameAsString();
Path storePath = new Path(regionPath, family);
@ -203,13 +204,26 @@ public class MobFileCleanerChore extends ScheduledChore {
for (Path pp : storeFiles) {
currentPath = pp;
LOG.trace("Store file: {}", pp);
HStoreFile sf =
new HStoreFile(fs, pp, conf, CacheConfig.DISABLED, BloomType.NONE, true);
sf.initReader();
byte[] mobRefData = sf.getMetadataValue(HStoreFile.MOB_FILE_REFS);
byte[] bulkloadMarkerData = sf.getMetadataValue(HStoreFile.BULKLOAD_TASK_KEY);
// close store file to avoid memory leaks
sf.closeStoreFile(true);
HStoreFile sf = null;
byte[] mobRefData = null;
byte[] bulkloadMarkerData = null;
try {
sf = new HStoreFile(fs, pp, conf, CacheConfig.DISABLED, BloomType.NONE, true);
sf.initReader();
mobRefData = sf.getMetadataValue(HStoreFile.MOB_FILE_REFS);
bulkloadMarkerData = sf.getMetadataValue(HStoreFile.BULKLOAD_TASK_KEY);
// close store file to avoid memory leaks
sf.closeStoreFile(true);
} catch (IOException ex) {
// When FileBased SFT is active the store dir can contain corrupted or incomplete
// files. So read errors are expected. We just skip these files.
if (ex instanceof FileNotFoundException) {
throw ex;
}
LOG.debug("Failed to get mob data from file: {} due to error.", pp.toString(),
ex);
continue;
}
if (mobRefData == null) {
if (bulkloadMarkerData == null) {
LOG.warn("Found old store file with no MOB_FILE_REFS: {} - "
@ -264,9 +278,11 @@ public class MobFileCleanerChore extends ScheduledChore {
while (rit.hasNext()) {
LocatedFileStatus lfs = rit.next();
Path p = lfs.getPath();
if (!allActiveMobFileName.contains(p.getName())) {
// MOB is not in a list of active references, but it can be too
// fresh, skip it in this case
String[] mobParts = p.getName().split("_");
String regionName = mobParts[mobParts.length - 1];
if (!regionNames.contains(regionName)) {
// MOB belonged to a region no longer hosted
long creationTime = fs.getFileStatus(p).getModificationTime();
if (creationTime < maxCreationTimeToArchive) {
LOG.trace("Archiving MOB file {} creation time={}", p,
@ -277,7 +293,7 @@ public class MobFileCleanerChore extends ScheduledChore {
fs.getFileStatus(p).getModificationTime());
}
} else {
LOG.trace("Keeping active MOB file: {}", p);
LOG.trace("Keeping MOB file with existing region: {}", p);
}
}
LOG.info(" MOB Cleaner found {} files to archive for table={} family={}", toArchive.size(),

View File

@ -33,6 +33,7 @@ import java.util.Date;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import java.util.function.Consumer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
@ -584,6 +585,33 @@ public final class MobUtils {
CacheConfig cacheConfig, Encryption.Context cryptoContext, ChecksumType checksumType,
int bytesPerChecksum, int blocksize, BloomType bloomType, boolean isCompaction)
throws IOException {
return createWriter(conf, fs, family, path, maxKeyCount, compression, cacheConfig,
cryptoContext, checksumType, bytesPerChecksum, blocksize, bloomType, isCompaction, null);
}
/**
* Creates a writer for the mob file in temp directory.
* @param conf The current configuration.
* @param fs The current file system.
* @param family The descriptor of the current column family.
* @param path The path for a temp directory.
* @param maxKeyCount The key count.
* @param compression The compression algorithm.
* @param cacheConfig The current cache config.
* @param cryptoContext The encryption context.
* @param checksumType The checksum type.
* @param bytesPerChecksum The bytes per checksum.
* @param blocksize The HFile block size.
* @param bloomType The bloom filter type.
* @param isCompaction If the writer is used in compaction.
* @param writerCreationTracker to track the current writer in the store
* @return The writer for the mob file.
*/
public static StoreFileWriter createWriter(Configuration conf, FileSystem fs,
ColumnFamilyDescriptor family, Path path, long maxKeyCount, Compression.Algorithm compression,
CacheConfig cacheConfig, Encryption.Context cryptoContext, ChecksumType checksumType,
int bytesPerChecksum, int blocksize, BloomType bloomType, boolean isCompaction,
Consumer<Path> writerCreationTracker) throws IOException {
if (compression == null) {
compression = HFile.DEFAULT_COMPRESSION_ALGORITHM;
}
@ -602,7 +630,8 @@ public final class MobUtils {
.withCreateTime(EnvironmentEdgeManager.currentTime()).build();
StoreFileWriter w = new StoreFileWriter.Builder(conf, writerCacheConf, fs).withFilePath(path)
.withBloomType(bloomType).withMaxKeyCount(maxKeyCount).withFileContext(hFileContext).build();
.withBloomType(bloomType).withMaxKeyCount(maxKeyCount).withFileContext(hFileContext)
.withWriterCreationTracker(writerCreationTracker).build();
return w;
}

View File

@ -0,0 +1,272 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.mob;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.hbase.ScheduledChore;
import org.apache.hadoop.hbase.TableDescriptors;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.backup.HFileArchiver;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.HStore;
import org.apache.hadoop.hbase.regionserver.HStoreFile;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.collect.SetMultimap;
/**
* The class RSMobFileCleanerChore for running cleaner regularly to remove the obsolete (files which
* have no active references to) mob files that were referenced from the current RS.
*/
@InterfaceAudience.Private
public class RSMobFileCleanerChore extends ScheduledChore {
private static final Logger LOG = LoggerFactory.getLogger(RSMobFileCleanerChore.class);
private final HRegionServer rs;
public RSMobFileCleanerChore(HRegionServer rs) {
super(rs.getServerName() + "-MobFileCleanerChore", rs,
rs.getConfiguration().getInt(MobConstants.MOB_CLEANER_PERIOD,
MobConstants.DEFAULT_MOB_CLEANER_PERIOD),
Math.round(rs.getConfiguration().getInt(MobConstants.MOB_CLEANER_PERIOD,
MobConstants.DEFAULT_MOB_CLEANER_PERIOD)
* ((ThreadLocalRandom.current().nextDouble() + 0.5D))),
TimeUnit.SECONDS);
// to prevent a load spike on the fs the initial delay is modified by +/- 50%
this.rs = rs;
}
public RSMobFileCleanerChore() {
this.rs = null;
}
@Override
protected void chore() {
long minAgeToArchive = rs.getConfiguration().getLong(MobConstants.MIN_AGE_TO_ARCHIVE_KEY,
MobConstants.DEFAULT_MIN_AGE_TO_ARCHIVE);
// We check only those MOB files, which creation time is less
// than maxCreationTimeToArchive. This is a current time - 1h. 1 hour gap
// gives us full confidence that all corresponding store files will
// exist at the time cleaning procedure begins and will be examined.
// So, if MOB file creation time is greater than this maxTimeToArchive,
// this will be skipped and won't be archived.
long maxCreationTimeToArchive = EnvironmentEdgeManager.currentTime() - minAgeToArchive;
TableDescriptors htds = rs.getTableDescriptors();
try {
FileSystem fs = FileSystem.get(rs.getConfiguration());
Map<String, TableDescriptor> map = null;
try {
map = htds.getAll();
} catch (IOException e) {
LOG.error("MobFileCleanerChore failed", e);
return;
}
Map<String, Map<String, List<String>>> referencedMOBs = new HashMap<>();
for (TableDescriptor htd : map.values()) {
// Now clean obsolete files for a table
LOG.info("Cleaning obsolete MOB files from table={}", htd.getTableName());
List<ColumnFamilyDescriptor> list = MobUtils.getMobColumnFamilies(htd);
List<HRegion> regions = rs.getRegions(htd.getTableName());
for (HRegion region : regions) {
for (ColumnFamilyDescriptor hcd : list) {
HStore store = region.getStore(hcd.getName());
Collection<HStoreFile> sfs = store.getStorefiles();
Set<String> regionMobs = new HashSet<String>();
Path currentPath = null;
try {
// collectinng referenced MOBs
for (HStoreFile sf : sfs) {
currentPath = sf.getPath();
sf.initReader();
byte[] mobRefData = sf.getMetadataValue(HStoreFile.MOB_FILE_REFS);
byte[] bulkloadMarkerData = sf.getMetadataValue(HStoreFile.BULKLOAD_TASK_KEY);
// close store file to avoid memory leaks
sf.closeStoreFile(true);
if (mobRefData == null) {
if (bulkloadMarkerData == null) {
LOG.warn(
"Found old store file with no MOB_FILE_REFS: {} - "
+ "can not proceed until all old files will be MOB-compacted.",
currentPath);
return;
} else {
LOG.debug("Skipping file without MOB references (bulkloaded file):{}",
currentPath);
continue;
}
}
// file may or may not have MOB references, but was created by the distributed
// mob compaction code.
try {
SetMultimap<TableName, String> mobs =
MobUtils.deserializeMobFileRefs(mobRefData).build();
LOG.debug("Found {} mob references for store={}", mobs.size(), sf);
LOG.trace("Specific mob references found for store={} : {}", sf, mobs);
regionMobs.addAll(mobs.values());
} catch (RuntimeException exception) {
throw new IOException("failure getting mob references for hfile " + sf,
exception);
}
}
// collecting files, MOB included currently being written
regionMobs.addAll(store.getStoreFilesBeingWritten().stream()
.map(path -> path.getName()).collect(Collectors.toList()));
referencedMOBs
.computeIfAbsent(hcd.getNameAsString(), cf -> new HashMap<String, List<String>>())
.computeIfAbsent(region.getRegionInfo().getEncodedName(), name -> new ArrayList<>())
.addAll(regionMobs);
} catch (FileNotFoundException e) {
LOG.warn(
"Missing file:{} Starting MOB cleaning cycle from the beginning" + " due to error",
currentPath, e);
regionMobs.clear();
continue;
} catch (IOException e) {
LOG.error("Failed to clean the obsolete mob files for table={}",
htd.getTableName().getNameAsString(), e);
}
}
}
if (LOG.isDebugEnabled()) {
LOG.debug("Found: {} active mob refs for table={}",
referencedMOBs.values().stream().map(inner -> inner.values())
.flatMap(lists -> lists.stream()).mapToInt(lists -> lists.size()).sum(),
htd.getTableName().getNameAsString());
}
if (LOG.isTraceEnabled()) {
referencedMOBs.values().stream().forEach(innerMap -> innerMap.values().stream()
.forEach(mobFileList -> mobFileList.stream().forEach(LOG::trace)));
}
// collect regions referencing MOB files belonging to the current rs
Set<String> regionsCovered = new HashSet<>();
referencedMOBs.values().stream()
.forEach(regionMap -> regionsCovered.addAll(regionMap.keySet()));
for (ColumnFamilyDescriptor hcd : list) {
List<Path> toArchive = new ArrayList<Path>();
String family = hcd.getNameAsString();
Path dir = MobUtils.getMobFamilyPath(rs.getConfiguration(), htd.getTableName(), family);
RemoteIterator<LocatedFileStatus> rit = fs.listLocatedStatus(dir);
while (rit.hasNext()) {
LocatedFileStatus lfs = rit.next();
Path p = lfs.getPath();
String[] mobParts = p.getName().split("_");
String regionName = mobParts[mobParts.length - 1];
// skip MOB files not belonging to a region assigned to the current rs
if (!regionsCovered.contains(regionName)) {
LOG.trace("MOB file does not belong to current rs: {}", p);
continue;
}
// check active or actively written mob files
Map<String, List<String>> cfMobs = referencedMOBs.get(hcd.getNameAsString());
if (
cfMobs != null && cfMobs.get(regionName) != null
&& cfMobs.get(regionName).contains(p.getName())
) {
LOG.trace("Keeping active MOB file: {}", p);
continue;
}
// MOB is not in a list of active references, but it can be too
// fresh, skip it in this case
long creationTime = fs.getFileStatus(p).getModificationTime();
if (creationTime < maxCreationTimeToArchive) {
LOG.trace("Archiving MOB file {} creation time={}", p,
(fs.getFileStatus(p).getModificationTime()));
toArchive.add(p);
} else {
LOG.trace("Skipping fresh file: {}. Creation time={}", p,
fs.getFileStatus(p).getModificationTime());
}
}
LOG.info(" MOB Cleaner found {} files to archive for table={} family={}",
toArchive.size(), htd.getTableName().getNameAsString(), family);
archiveMobFiles(rs.getConfiguration(), htd.getTableName(), family.getBytes(), toArchive);
LOG.info(" MOB Cleaner archived {} files, table={} family={}", toArchive.size(),
htd.getTableName().getNameAsString(), family);
}
LOG.info("Cleaning obsolete MOB files finished for table={}", htd.getTableName());
}
} catch (IOException e) {
LOG.error("MOB Cleaner failed when trying to access the file system", e);
}
}
/**
* Archives the mob files.
* @param conf The current configuration.
* @param tableName The table name.
* @param family The name of the column family.
* @param storeFiles The files to be archived.
* @throws IOException exception
*/
public void archiveMobFiles(Configuration conf, TableName tableName, byte[] family,
List<Path> storeFiles) throws IOException {
if (storeFiles.size() == 0) {
// nothing to remove
LOG.debug("Skipping archiving old MOB files - no files found for table={} cf={}", tableName,
Bytes.toString(family));
return;
}
Path mobTableDir = CommonFSUtils.getTableDir(MobUtils.getMobHome(conf), tableName);
FileSystem fs = storeFiles.get(0).getFileSystem(conf);
for (Path p : storeFiles) {
LOG.debug("MOB Cleaner is archiving: {}", p);
HFileArchiver.archiveStoreFile(conf, fs, MobUtils.getMobRegionInfo(tableName), mobTableDir,
family, p);
}
}
}

View File

@ -28,6 +28,7 @@ import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@ -184,7 +185,27 @@ public class HMobStore extends HStore {
}
Path path = getTempDir();
return createWriterInTmp(MobUtils.formatDate(date), path, maxKeyCount, compression, startKey,
isCompaction);
isCompaction, null);
}
/**
* Creates the writer for the mob file in the mob family directory.
* @param date The latest date of written cells.
* @param maxKeyCount The key count.
* @param compression The compression algorithm.
* @param startKey The start key.
* @param isCompaction If the writer is used in compaction.
* @return The writer for the mob file. n
*/
public StoreFileWriter createWriter(Date date, long maxKeyCount,
Compression.Algorithm compression, byte[] startKey, boolean isCompaction,
Consumer<Path> writerCreationTracker) throws IOException {
if (startKey == null) {
startKey = HConstants.EMPTY_START_ROW;
}
Path path = getPath();
return createWriterInTmp(MobUtils.formatDate(date), path, maxKeyCount, compression, startKey,
isCompaction, writerCreationTracker);
}
/**
@ -198,11 +219,13 @@ public class HMobStore extends HStore {
* @return The writer for the mob file. n
*/
public StoreFileWriter createWriterInTmp(String date, Path basePath, long maxKeyCount,
Compression.Algorithm compression, byte[] startKey, boolean isCompaction) throws IOException {
Compression.Algorithm compression, byte[] startKey, boolean isCompaction,
Consumer<Path> writerCreationTracker) throws IOException {
MobFileName mobFileName =
MobFileName.create(startKey, date, UUID.randomUUID().toString().replaceAll("-", ""),
getHRegion().getRegionInfo().getEncodedName());
return createWriterInTmp(mobFileName, basePath, maxKeyCount, compression, isCompaction);
return createWriterInTmp(mobFileName, basePath, maxKeyCount, compression, isCompaction,
writerCreationTracker);
}
/**
@ -214,13 +237,15 @@ public class HMobStore extends HStore {
* @param isCompaction If the writer is used in compaction.
* @return The writer for the mob file. n
*/
public StoreFileWriter createWriterInTmp(MobFileName mobFileName, Path basePath, long maxKeyCount,
Compression.Algorithm compression, boolean isCompaction) throws IOException {
Compression.Algorithm compression, boolean isCompaction, Consumer<Path> writerCreationTracker)
throws IOException {
return MobUtils.createWriter(conf, getFileSystem(), getColumnFamilyDescriptor(),
new Path(basePath, mobFileName.getFileName()), maxKeyCount, compression, getCacheConfig(),
getStoreContext().getEncryptionContext(), StoreUtils.getChecksumType(conf),
StoreUtils.getBytesPerChecksum(conf), getStoreContext().getBlockSize(), BloomType.NONE,
isCompaction);
isCompaction, writerCreationTracker);
}
/**
@ -234,6 +259,10 @@ public class HMobStore extends HStore {
}
Path dstPath = new Path(targetPath, sourceFile.getName());
validateMobFile(sourceFile);
if (sourceFile.equals(targetPath)) {
LOG.info("File is already in the destination dir: {}", sourceFile);
return;
}
LOG.info(" FLUSH Renaming flushed file from {} to {}", sourceFile, dstPath);
Path parent = dstPath.getParent();
if (!getFileSystem().exists(parent)) {

View File

@ -131,6 +131,7 @@ import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.master.LoadBalancer;
import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer;
import org.apache.hadoop.hbase.mob.MobFileCache;
import org.apache.hadoop.hbase.mob.RSMobFileCleanerChore;
import org.apache.hadoop.hbase.monitoring.TaskMonitor;
import org.apache.hadoop.hbase.namequeues.NamedQueueRecorder;
import org.apache.hadoop.hbase.namequeues.SlowLogTableOpsChore;
@ -553,6 +554,8 @@ public class HRegionServer extends Thread
private BrokenStoreFileCleaner brokenStoreFileCleaner;
private RSMobFileCleanerChore rsMobFileCleanerChore;
@InterfaceAudience.Private
CompactedHFilesDischarger compactedFileDischarger;
@ -2181,6 +2184,10 @@ public class HRegionServer extends Thread
choreService.scheduleChore(brokenStoreFileCleaner);
}
if (this.rsMobFileCleanerChore != null) {
choreService.scheduleChore(rsMobFileCleanerChore);
}
// Leases is not a Thread. Internally it runs a daemon thread. If it gets
// an unhandled exception, it will just exit.
Threads.setDaemonThreadRunning(this.leaseManager, getName() + ".leaseChecker",
@ -2277,6 +2284,8 @@ public class HRegionServer extends Thread
new BrokenStoreFileCleaner((int) (brokenStoreFileCleanerDelay + jitterValue),
brokenStoreFileCleanerPeriod, this, conf, this);
this.rsMobFileCleanerChore = new RSMobFileCleanerChore(this);
registerConfigurationObservers();
}
@ -2759,6 +2768,7 @@ public class HRegionServer extends Thread
shutdownChore(storefileRefresher);
shutdownChore(fsUtilizationChore);
shutdownChore(slowLogTableOpsChore);
shutdownChore(rsMobFileCleanerChore);
// cancel the remaining scheduled chores (in case we missed out any)
// TODO: cancel will not cleanup the chores, so we need make sure we do not miss any
choreService.shutdown();
@ -4022,4 +4032,9 @@ public class HRegionServer extends Thread
public BrokenStoreFileCleaner getBrokenStoreFileCleaner() {
return brokenStoreFileCleaner;
}
@InterfaceAudience.Private
public RSMobFileCleanerChore getRSMobFileCleanerChore() {
return rsMobFileCleanerChore;
}
}

View File

@ -2421,7 +2421,7 @@ public class HStore
* {@link BrokenStoreFileCleaner} to prevent deleting the these files as they are not present in
* SFT yet.
*/
Set<Path> getStoreFilesBeingWritten() {
public Set<Path> getStoreFilesBeingWritten() {
return storeFileWriterCreationTrackers.stream().flatMap(t -> t.get().stream())
.collect(Collectors.toSet());
}

View File

@ -361,7 +361,7 @@ public abstract class Compactor<T extends CellSink> {
writer = sinkFactory.createWriter(scanner, fd, dropCache, request.isMajor(),
request.getWriterCreationTracker());
finished = performCompaction(fd, scanner, writer, smallestReadPoint, cleanSeqId,
throughputController, request.isAllFiles(), request.getFiles().size(), progress);
throughputController, request, progress);
if (!finished) {
throw new InterruptedIOException("Aborting compaction of store " + store + " in region "
+ store.getRegionInfo().getRegionNameAsString() + " because it was interrupted.");
@ -400,20 +400,19 @@ public abstract class Compactor<T extends CellSink> {
/**
* Performs the compaction.
* @param fd FileDetails of cell sink writer
* @param scanner Where to read from.
* @param writer Where to write to.
* @param smallestReadPoint Smallest read point.
* @param cleanSeqId When true, remove seqId(used to be mvcc) value which is &lt;=
* smallestReadPoint
* @param major Is a major compaction.
* @param numofFilesToCompact the number of files to compact
* @param progress Progress reporter.
* @param fd FileDetails of cell sink writer
* @param scanner Where to read from.
* @param writer Where to write to.
* @param smallestReadPoint Smallest read point.
* @param cleanSeqId When true, remove seqId(used to be mvcc) value which is &lt;=
* smallestReadPoint
* @param request compaction request.
* @param progress Progress reporter.
* @return Whether compaction ended; false if it was interrupted for some reason.
*/
protected boolean performCompaction(FileDetails fd, InternalScanner scanner, CellSink writer,
long smallestReadPoint, boolean cleanSeqId, ThroughputController throughputController,
boolean major, int numofFilesToCompact, CompactionProgress progress) throws IOException {
CompactionRequestImpl request, CompactionProgress progress) throws IOException {
assert writer instanceof ShipperListener;
long bytesWrittenProgressForLog = 0;
long bytesWrittenProgressForShippedCall = 0;
@ -435,7 +434,7 @@ public abstract class Compactor<T extends CellSink> {
throughputController.start(compactionName);
KeyValueScanner kvs = (scanner instanceof KeyValueScanner) ? (KeyValueScanner) scanner : null;
long shippedCallSizeLimit =
(long) numofFilesToCompact * this.store.getColumnFamilyDescriptor().getBlocksize();
(long) request.getFiles().size() * this.store.getColumnFamilyDescriptor().getBlocksize();
try {
do {
hasMore = scanner.next(cells, scannerContext);

View File

@ -42,6 +42,7 @@ import org.apache.hadoop.hbase.regionserver.ShipperListener;
import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
import org.apache.hadoop.hbase.regionserver.compactions.CloseChecker;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequestImpl;
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputControlUtil;
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
import org.apache.hadoop.hbase.util.Bytes;
@ -91,8 +92,9 @@ public class FaultyMobStoreCompactor extends DefaultMobStoreCompactor {
@Override
protected boolean performCompaction(FileDetails fd, InternalScanner scanner, CellSink writer,
long smallestReadPoint, boolean cleanSeqId, ThroughputController throughputController,
boolean major, int numofFilesToCompact, CompactionProgress progress) throws IOException {
CompactionRequestImpl request, CompactionProgress progress) throws IOException {
boolean major = request.isAllFiles();
totalCompactions.incrementAndGet();
if (major) {
totalMajorCompactions.incrementAndGet();
@ -143,7 +145,7 @@ public class FaultyMobStoreCompactor extends DefaultMobStoreCompactor {
throughputController.start(compactionName);
KeyValueScanner kvs = (scanner instanceof KeyValueScanner) ? (KeyValueScanner) scanner : null;
long shippedCallSizeLimit =
(long) numofFilesToCompact * this.store.getColumnFamilyDescriptor().getBlocksize();
(long) request.getFiles().size() * this.store.getColumnFamilyDescriptor().getBlocksize();
Cell mobCell = null;

View File

@ -18,17 +18,26 @@
package org.apache.hadoop.hbase.mob;
import java.io.IOException;
import java.util.Date;
import java.util.List;
import java.util.Random;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.hadoop.hbase.io.crypto.Encryption;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
@ -112,4 +121,17 @@ public class MobTestUtil {
scan.setAttribute(MobConstants.MOB_SCAN_RAW, Bytes.toBytes(Boolean.TRUE));
return util.countRows(table, scan);
}
public static Path generateMOBFileForRegion(Configuration conf, TableName tableName,
ColumnFamilyDescriptor familyDescriptor, String regionName) throws IOException {
Date date = new Date();
String dateStr = MobUtils.formatDate(date);
FileSystem fs = FileSystem.get(conf);
Path cfMOBDir = MobUtils.getMobFamilyPath(conf, tableName, familyDescriptor.getNameAsString());
StoreFileWriter writer = MobUtils.createWriter(conf, fs, familyDescriptor, dateStr, cfMOBDir,
1000L, Compression.Algorithm.NONE, "startKey", CacheConfig.DISABLED, Encryption.Context.NONE,
false, "");
writer.close();
return writer.getPath();
}
}

View File

@ -17,30 +17,37 @@
*/
package org.apache.hadoop.hbase.mob;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.AfterClass;
import org.junit.After;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@RunWith(Parameterized.class)
@Category(LargeTests.class)
public class TestDefaultMobStoreFlusher {
@ -60,41 +67,52 @@ public class TestDefaultMobStoreFlusher {
@Rule
public TestName name = new TestName();
@BeforeClass
public static void setUpBeforeClass() throws Exception {
protected Boolean useFileBasedSFT;
public TestDefaultMobStoreFlusher(Boolean useFileBasedSFT) {
this.useFileBasedSFT = useFileBasedSFT;
}
@Parameterized.Parameters
public static Collection<Boolean> data() {
Boolean[] data = { false, true };
return Arrays.asList(data);
}
@Before
public void setUpBefore() throws Exception {
if (useFileBasedSFT) {
TEST_UTIL.getConfiguration().set(StoreFileTrackerFactory.TRACKER_IMPL,
"org.apache.hadoop.hbase.regionserver.storefiletracker.FileBasedStoreFileTracker");
}
TEST_UTIL.startMiniCluster(1);
}
@AfterClass
public static void tearDownAfterClass() throws Exception {
@After
public void tearDownAfter() throws Exception {
TEST_UTIL.shutdownMiniCluster();
}
@Test
public void testFlushNonMobFile() throws Exception {
final TableName tableName = TableName.valueOf(name.getMethodName());
HTableDescriptor desc = new HTableDescriptor(tableName);
HColumnDescriptor hcd = new HColumnDescriptor(family);
hcd.setMaxVersions(4);
desc.addFamily(hcd);
testFlushFile(desc);
final TableName tableName = TableName.valueOf(TestMobUtils.getTableName(name));
TableDescriptor tableDescriptor = TableDescriptorBuilder.newBuilder(tableName)
.setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(4).build())
.build();
testFlushFile(tableDescriptor);
}
@Test
public void testFlushMobFile() throws Exception {
final TableName tableName = TableName.valueOf(name.getMethodName());
HTableDescriptor desc = new HTableDescriptor(tableName);
HColumnDescriptor hcd = new HColumnDescriptor(family);
hcd.setMobEnabled(true);
hcd.setMobThreshold(3L);
hcd.setMaxVersions(4);
desc.addFamily(hcd);
testFlushFile(desc);
final TableName tableName = TableName.valueOf(TestMobUtils.getTableName(name));
TableDescriptor tableDescriptor = TableDescriptorBuilder.newBuilder(tableName)
.setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(family).setMobEnabled(true)
.setMobThreshold(3L).setMaxVersions(4).build())
.build();
testFlushFile(tableDescriptor);
}
private void testFlushFile(HTableDescriptor htd) throws Exception {
private void testFlushFile(TableDescriptor htd) throws Exception {
Table table = null;
try {
table = TEST_UTIL.createTable(htd, null);

View File

@ -1,218 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.mob;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.IOException;
import java.util.Arrays;
import java.util.Random;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.CompactionState;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.RegionSplitter;
import org.junit.After;
import org.junit.Before;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Mob file compaction base test. 1. Enables batch mode for regular MOB compaction, Sets batch size
* to 7 regions. (Optional) 2. Disables periodic MOB compactions, sets minimum age to archive to 10
* sec 3. Creates MOB table with 20 regions 4. Loads MOB data (randomized keys, 1000 rows), flushes
* data. 5. Repeats 4. two more times 6. Verifies that we have 20 *3 = 60 mob files (equals to
* number of regions x 3) 7. Runs major MOB compaction. 8. Verifies that number of MOB files in a
* mob directory is 20 x4 = 80 9. Waits for a period of time larger than minimum age to archive 10.
* Runs Mob cleaner chore 11 Verifies that number of MOB files in a mob directory is 20. 12 Runs
* scanner and checks all 3 * 1000 rows.
*/
@SuppressWarnings("deprecation")
public abstract class TestMobCompactionBase {
private static final Logger LOG = LoggerFactory.getLogger(TestMobCompactionBase.class);
protected HBaseTestingUtility HTU;
protected final static String famStr = "f1";
protected final static byte[] fam = Bytes.toBytes(famStr);
protected final static byte[] qualifier = Bytes.toBytes("q1");
protected final static long mobLen = 10;
protected final static byte[] mobVal = Bytes
.toBytes("01234567890123456789012345678901234567890123456789012345678901234567890123456789");
protected Configuration conf;
protected HTableDescriptor hdt;
private HColumnDescriptor hcd;
protected Admin admin;
protected Table table = null;
protected long minAgeToArchive = 10000;
protected int numRegions = 20;
protected int rows = 1000;
protected MobFileCleanerChore cleanerChore;
public TestMobCompactionBase() {
}
@Before
public void setUp() throws Exception {
HTU = new HBaseTestingUtility();
hdt = HTU.createTableDescriptor(getClass().getName());
conf = HTU.getConfiguration();
initConf();
HTU.startMiniCluster();
admin = HTU.getAdmin();
cleanerChore = new MobFileCleanerChore();
hcd = new HColumnDescriptor(fam);
hcd.setMobEnabled(true);
hcd.setMobThreshold(mobLen);
hcd.setMaxVersions(1);
hdt.addFamily(hcd);
RegionSplitter.UniformSplit splitAlgo = new RegionSplitter.UniformSplit();
byte[][] splitKeys = splitAlgo.split(numRegions);
table = HTU.createTable(hdt, splitKeys);
}
protected void initConf() {
conf.setInt("hfile.format.version", 3);
// Disable automatic MOB compaction
conf.setLong(MobConstants.MOB_COMPACTION_CHORE_PERIOD, 0);
// Disable automatic MOB file cleaner chore
conf.setLong(MobConstants.MOB_CLEANER_PERIOD, 0);
// Set minimum age to archive to 10 sec
conf.setLong(MobConstants.MIN_AGE_TO_ARCHIVE_KEY, minAgeToArchive);
// Set compacted file discharger interval to a half minAgeToArchive
conf.setLong("hbase.hfile.compaction.discharger.interval", minAgeToArchive / 2);
}
private void loadData(int num) {
Random r = new Random();
try {
LOG.info("Started loading {} rows", num);
for (int i = 0; i < num; i++) {
byte[] key = new byte[32];
r.nextBytes(key);
Put p = new Put(key);
p.addColumn(fam, qualifier, mobVal);
table.put(p);
}
admin.flush(table.getName());
LOG.info("Finished loading {} rows", num);
} catch (Exception e) {
LOG.error("MOB file compaction chore test FAILED", e);
fail("MOB file compaction chore test FAILED");
}
}
@After
public void tearDown() throws Exception {
admin.disableTable(hdt.getTableName());
admin.deleteTable(hdt.getTableName());
HTU.shutdownMiniCluster();
}
public void baseTestMobFileCompaction() throws InterruptedException, IOException {
// Load and flush data 3 times
loadData(rows);
loadData(rows);
loadData(rows);
long num = getNumberOfMobFiles(conf, table.getName(), new String(fam));
assertEquals(numRegions * 3, num);
// Major MOB compact
mobCompact(admin, hdt, hcd);
// wait until compaction is complete
while (admin.getCompactionState(hdt.getTableName()) != CompactionState.NONE) {
Thread.sleep(100);
}
num = getNumberOfMobFiles(conf, table.getName(), new String(fam));
assertEquals(numRegions * 4, num);
// We have guarantee, that compacted file discharger will run during this pause
// because it has interval less than this wait time
LOG.info("Waiting for {}ms", minAgeToArchive + 1000);
Thread.sleep(minAgeToArchive + 1000);
LOG.info("Cleaning up MOB files");
// Cleanup again
cleanerChore.cleanupObsoleteMobFiles(conf, table.getName());
num = getNumberOfMobFiles(conf, table.getName(), new String(fam));
assertEquals(numRegions, num);
long scanned = scanTable();
assertEquals(3 * rows, scanned);
}
protected abstract void mobCompact(Admin admin2, HTableDescriptor hdt2, HColumnDescriptor hcd2)
throws IOException, InterruptedException;
protected long getNumberOfMobFiles(Configuration conf, TableName tableName, String family)
throws IOException {
FileSystem fs = FileSystem.get(conf);
Path dir = MobUtils.getMobFamilyPath(conf, tableName, family);
FileStatus[] stat = fs.listStatus(dir);
for (FileStatus st : stat) {
LOG.debug("MOB Directory content: {}", st.getPath());
}
LOG.debug("MOB Directory content total files: {}", stat.length);
return stat.length;
}
protected long scanTable() {
try {
Result result;
ResultScanner scanner = table.getScanner(fam);
long counter = 0;
while ((result = scanner.next()) != null) {
assertTrue(Arrays.equals(result.getValue(fam, qualifier), mobVal));
counter++;
}
return counter;
} catch (Exception e) {
LOG.error("MOB file compaction test FAILED", e);
if (HTU != null) {
fail(e.getMessage());
} else {
System.exit(-1);
}
}
return 0;
}
}

View File

@ -17,10 +17,8 @@
*/
package org.apache.hadoop.hbase.mob;
import java.io.IOException;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.experimental.categories.Category;
@ -40,12 +38,13 @@ public class TestMobCompactionOptMode extends TestMobCompactionWithDefaults {
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestMobCompactionOptMode.class);
@BeforeClass
public static void configureOptimizedCompaction() throws InterruptedException, IOException {
HTU.shutdownMiniHBaseCluster();
public TestMobCompactionOptMode(Boolean useFileBasedSFT) {
super(useFileBasedSFT);
}
protected void additonalConfigSetup() {
conf.set(MobConstants.MOB_COMPACTION_TYPE_KEY, MobConstants.OPTIMIZED_MOB_COMPACTION_TYPE);
conf.setLong(MobConstants.MOB_COMPACTION_MAX_FILE_SIZE_KEY, 1000000);
HTU.startMiniHBaseCluster();
}
@Override

View File

@ -23,9 +23,10 @@ import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -39,6 +40,7 @@ import org.slf4j.LoggerFactory;
* time larger than minimum age to archive 10. Runs Mob cleaner chore 11 Verifies that number of MOB
* files in a mob directory is 20. 12 Runs scanner and checks all 3 * 1000 rows.
*/
@RunWith(Parameterized.class)
@Category(LargeTests.class)
public class TestMobCompactionOptRegionBatchMode extends TestMobCompactionWithDefaults {
private static final Logger LOG =
@ -50,20 +52,20 @@ public class TestMobCompactionOptRegionBatchMode extends TestMobCompactionWithDe
private static final int batchSize = 7;
private MobFileCompactionChore compactionChore;
public TestMobCompactionOptRegionBatchMode(Boolean useFileBasedSFT) {
super(useFileBasedSFT);
}
@Before
public void setUp() throws Exception {
super.setUp();
compactionChore = new MobFileCompactionChore(conf, batchSize);
}
@BeforeClass
public static void configureOptimizedCompactionAndBatches()
throws InterruptedException, IOException {
HTU.shutdownMiniHBaseCluster();
protected void additonalConfigSetup() {
conf.setInt(MobConstants.MOB_MAJOR_COMPACTION_REGION_BATCH_SIZE, batchSize);
conf.set(MobConstants.MOB_COMPACTION_TYPE_KEY, MobConstants.OPTIMIZED_MOB_COMPACTION_TYPE);
conf.setLong(MobConstants.MOB_COMPACTION_MAX_FILE_SIZE_KEY, 1000000);
HTU.startMiniHBaseCluster();
}
@Override

View File

@ -1,68 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.mob;
import java.io.IOException;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Mob file compaction chore in a regular non-batch mode test. 1. Uses default (non-batch) mode for
* regular MOB compaction, 2. Disables periodic MOB compactions, sets minimum age to archive to 10
* sec 3. Creates MOB table with 20 regions 4. Loads MOB data (randomized keys, 1000 rows), flushes
* data. 5. Repeats 4. two more times 6. Verifies that we have 20 *3 = 60 mob files (equals to
* number of regions x 3) 7. Runs major MOB compaction. 8. Verifies that number of MOB files in a
* mob directory is 20 x4 = 80 9. Waits for a period of time larger than minimum age to archive 10.
* Runs Mob cleaner chore 11 Verifies that number of MOB files in a mob directory is 20. 12 Runs
* scanner and checks all 3 * 1000 rows.
*/
@SuppressWarnings("deprecation")
@Category(LargeTests.class)
public class TestMobCompactionRegularMode extends TestMobCompactionBase {
private static final Logger LOG = LoggerFactory.getLogger(TestMobCompactionRegularMode.class);
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestMobCompactionRegularMode.class);
public TestMobCompactionRegularMode() {
}
@Test
public void testMobFileCompactionBatchMode() throws InterruptedException, IOException {
LOG.info("MOB compaction regular mode started");
baseTestMobFileCompaction();
LOG.info("MOB compaction regular mode finished OK");
}
@Override
protected void mobCompact(Admin admin, HTableDescriptor hdt, HColumnDescriptor hcd)
throws IOException, InterruptedException {
// Major compact MOB table
admin.majorCompact(hdt.getTableName(), hcd.getName());
}
}

View File

@ -23,9 +23,10 @@ import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -39,6 +40,7 @@ import org.slf4j.LoggerFactory;
* to archive 10. Runs Mob cleaner chore 11 Verifies that number of MOB files in a mob directory is
* 20. 12 Runs scanner and checks all 3 * 1000 rows.
*/
@RunWith(Parameterized.class)
@Category(LargeTests.class)
public class TestMobCompactionRegularRegionBatchMode extends TestMobCompactionWithDefaults {
private static final Logger LOG =
@ -50,17 +52,18 @@ public class TestMobCompactionRegularRegionBatchMode extends TestMobCompactionWi
private static final int batchSize = 7;
private MobFileCompactionChore compactionChore;
public TestMobCompactionRegularRegionBatchMode(Boolean useFileBasedSFT) {
super(useFileBasedSFT);
}
@Before
public void setUp() throws Exception {
super.setUp();
compactionChore = new MobFileCompactionChore(conf, batchSize);
}
@BeforeClass
public static void configureCompactionBatches() throws InterruptedException, IOException {
HTU.shutdownMiniHBaseCluster();
protected void additonalConfigSetup() {
conf.setInt(MobConstants.MOB_MAJOR_COMPACTION_REGION_BATCH_SIZE, batchSize);
HTU.startMiniHBaseCluster();
}
@Override

View File

@ -23,6 +23,7 @@ import static org.junit.Assert.fail;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Random;
import java.util.stream.Collectors;
@ -32,6 +33,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
@ -43,18 +45,19 @@ import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.RegionSplitter;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -68,6 +71,7 @@ import org.slf4j.LoggerFactory;
* Runs Mob cleaner chore 11 Verifies that number of MOB files in a mob directory is 20. 12 Runs
* scanner and checks all 3 * 1000 rows.
*/
@RunWith(Parameterized.class)
@Category(LargeTests.class)
public class TestMobCompactionWithDefaults {
private static final Logger LOG = LoggerFactory.getLogger(TestMobCompactionWithDefaults.class);
@ -97,8 +101,19 @@ public class TestMobCompactionWithDefaults {
protected MobFileCleanerChore cleanerChore;
@BeforeClass
public static void htuStart() throws Exception {
protected Boolean useFileBasedSFT;
public TestMobCompactionWithDefaults(Boolean useFileBasedSFT) {
this.useFileBasedSFT = useFileBasedSFT;
}
@Parameterized.Parameters
public static Collection<Boolean> data() {
Boolean[] data = { false, true };
return Arrays.asList(data);
}
protected void htuStart() throws Exception {
HTU = new HBaseTestingUtility();
conf = HTU.getConfiguration();
conf.setInt("hfile.format.version", 3);
@ -111,17 +126,21 @@ public class TestMobCompactionWithDefaults {
// Set compacted file discharger interval to a half minAgeToArchive
conf.setLong("hbase.hfile.compaction.discharger.interval", minAgeToArchive / 2);
conf.setBoolean("hbase.regionserver.compaction.enabled", false);
if (useFileBasedSFT) {
conf.set(StoreFileTrackerFactory.TRACKER_IMPL,
"org.apache.hadoop.hbase.regionserver.storefiletracker.FileBasedStoreFileTracker");
}
additonalConfigSetup();
HTU.startMiniCluster();
}
@AfterClass
public static void htuStop() throws Exception {
HTU.shutdownMiniCluster();
protected void additonalConfigSetup() {
}
@Before
public void setUp() throws Exception {
tableDescriptor = HTU.createModifyableTableDescriptor(test.getMethodName());
htuStart();
tableDescriptor = HTU.createModifyableTableDescriptor(TestMobUtils.getTableName(test));
admin = HTU.getAdmin();
cleanerChore = new MobFileCleanerChore();
familyDescriptor = new ColumnFamilyDescriptorBuilder.ModifyableColumnFamilyDescriptor(fam);
@ -158,6 +177,7 @@ public class TestMobCompactionWithDefaults {
public void tearDown() throws Exception {
admin.disableTable(tableDescriptor.getTableName());
admin.deleteTable(tableDescriptor.getTableName());
HTU.shutdownMiniCluster();
}
@Test
@ -173,12 +193,12 @@ public class TestMobCompactionWithDefaults {
@Test
public void testMobFileCompactionAfterSnapshotClone() throws InterruptedException, IOException {
final TableName clone = TableName.valueOf(test.getMethodName() + "-clone");
final TableName clone = TableName.valueOf(TestMobUtils.getTableName(test) + "-clone");
LOG.info("MOB compaction of cloned snapshot, " + description() + " started");
loadAndFlushThreeTimes(rows, table, famStr);
LOG.debug("Taking snapshot and cloning table {}", table);
admin.snapshot(test.getMethodName(), table);
admin.cloneSnapshot(test.getMethodName(), clone);
admin.snapshot(TestMobUtils.getTableName(test), table);
admin.cloneSnapshot(TestMobUtils.getTableName(test), clone);
assertEquals("Should have 3 hlinks per region in MOB area from snapshot clone", 3 * numRegions,
getNumberOfMobFiles(clone, famStr));
mobCompact(admin.getDescriptor(clone), familyDescriptor);
@ -191,12 +211,12 @@ public class TestMobCompactionWithDefaults {
@Test
public void testMobFileCompactionAfterSnapshotCloneAndFlush()
throws InterruptedException, IOException {
final TableName clone = TableName.valueOf(test.getMethodName() + "-clone");
final TableName clone = TableName.valueOf(TestMobUtils.getTableName(test) + "-clone");
LOG.info("MOB compaction of cloned snapshot after flush, " + description() + " started");
loadAndFlushThreeTimes(rows, table, famStr);
LOG.debug("Taking snapshot and cloning table {}", table);
admin.snapshot(test.getMethodName(), table);
admin.cloneSnapshot(test.getMethodName(), clone);
admin.snapshot(TestMobUtils.getTableName(test), table);
admin.cloneSnapshot(TestMobUtils.getTableName(test), clone);
assertEquals("Should have 3 hlinks per region in MOB area from snapshot clone", 3 * numRegions,
getNumberOfMobFiles(clone, famStr));
loadAndFlushThreeTimes(rows, clone, famStr);
@ -275,8 +295,11 @@ public class TestMobCompactionWithDefaults {
Thread.sleep(minAgeToArchive + 1000);
LOG.info("Cleaning up MOB files");
// Cleanup again
cleanerChore.cleanupObsoleteMobFiles(conf, table);
// run cleaner chore on each RS
for (ServerName sn : admin.getRegionServers()) {
HTU.getMiniHBaseCluster().getRegionServer(sn).getRSMobFileCleanerChore().chore();
}
assertEquals("After cleaning, we should have 1 MOB file per region based on size.", numRegions,
getNumberOfMobFiles(table, family));

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.hbase.mob;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
@ -32,6 +33,8 @@ import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.CompactionState;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
@ -168,14 +171,40 @@ public class TestMobFileCleanerChore {
Thread.sleep(minAgeToArchive + 1000);
LOG.info("Cleaning up MOB files");
// Cleanup again
// Cleanup
chore.cleanupObsoleteMobFiles(conf, table.getName());
// verify that nothing have happened
num = getNumberOfMobFiles(conf, table.getName(), new String(fam));
assertEquals(1, num);
assertEquals(4, num);
long scanned = scanTable();
assertEquals(30, scanned);
// add a MOB file to with a name refering to a non-existing region
ColumnFamilyDescriptor familyDescriptor = ColumnFamilyDescriptorBuilder.newBuilder(fam)
.setMobEnabled(true).setMobThreshold(mobLen).setMaxVersions(1).build();
Path extraMOBFile = MobTestUtil.generateMOBFileForRegion(conf, table.getName(),
familyDescriptor, "nonExistentRegion");
num = getNumberOfMobFiles(conf, table.getName(), new String(fam));
assertEquals(5, num);
LOG.info("Waiting for {}ms", minAgeToArchive + 1000);
Thread.sleep(minAgeToArchive + 1000);
LOG.info("Cleaning up MOB files");
chore.cleanupObsoleteMobFiles(conf, table.getName());
// check that the extra file got deleted
num = getNumberOfMobFiles(conf, table.getName(), new String(fam));
assertEquals(4, num);
FileSystem fs = FileSystem.get(conf);
assertFalse(fs.exists(extraMOBFile));
scanned = scanTable();
assertEquals(30, scanned);
}
private long getNumberOfMobFiles(Configuration conf, TableName tableName, String family)

View File

@ -26,11 +26,14 @@ import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
@ -65,6 +68,7 @@ import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.regionserver.RegionAsTable;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory;
import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.testclassification.MediumTests;
@ -78,12 +82,15 @@ import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Test mob store compaction
*/
@RunWith(Parameterized.class)
@Category(MediumTests.class)
public class TestMobStoreCompaction {
@ -108,13 +115,30 @@ public class TestMobStoreCompaction {
private final byte[] STARTROW = Bytes.toBytes(START_KEY);
private int compactionThreshold;
private Boolean useFileBasedSFT;
public TestMobStoreCompaction(Boolean useFileBasedSFT) {
this.useFileBasedSFT = useFileBasedSFT;
}
@Parameterized.Parameters
public static Collection<Boolean> data() {
Boolean[] data = { false, true };
return Arrays.asList(data);
}
private void init(Configuration conf, long mobThreshold) throws Exception {
if (useFileBasedSFT) {
conf.set(StoreFileTrackerFactory.TRACKER_IMPL,
"org.apache.hadoop.hbase.regionserver.storefiletracker.FileBasedStoreFileTracker");
}
this.conf = conf;
this.mobCellThreshold = mobThreshold;
HBaseTestingUtility UTIL = new HBaseTestingUtility(conf);
compactionThreshold = conf.getInt("hbase.hstore.compactionThreshold", 3);
htd = UTIL.createTableDescriptor(name.getMethodName());
htd = UTIL.createTableDescriptor(TestMobUtils.getTableName(name));
hcd = new HColumnDescriptor(COLUMN_FAMILY);
hcd.setMobEnabled(true);
hcd.setMobThreshold(mobThreshold);
@ -227,7 +251,7 @@ public class TestMobStoreCompaction {
Path basedir = new Path(hbaseRootDir, htd.getNameAsString());
List<Pair<byte[], String>> hfiles = new ArrayList<>(1);
for (int i = 0; i < compactionThreshold; i++) {
Path hpath = new Path(basedir, "hfile" + i);
Path hpath = new Path(basedir, UUID.randomUUID().toString().replace("-", ""));
hfiles.add(Pair.newPair(COLUMN_FAMILY, hpath.toString()));
createHFile(hpath, i, dummyData);
}

View File

@ -27,6 +27,7 @@ import org.apache.hadoop.hbase.util.Bytes;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableSet;
import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableSetMultimap;
@ -89,4 +90,8 @@ public class TestMobUtils {
assertTrue(testTable3Refs.contains("file3a"));
assertTrue(testTable3Refs.contains("file3b"));
}
public static String getTableName(TestName test) {
return test.getMethodName().replace("[", "-").replace("]", "");
}
}

View File

@ -0,0 +1,263 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.mob;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.CompactionState;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.master.cleaner.TimeToLiveHFileCleaner;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.After;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Mob file cleaner chore test. 1. Creates MOB table 2. Load MOB data and flushes it N times 3. Runs
* major MOB compaction 4. Verifies that number of MOB files in a mob directory is N+1 5. Waits for
* a period of time larger than minimum age to archive 6. Runs Mob cleaner chore 7 Verifies that
* every old MOB file referenced from current RS was archived
*/
@Category(MediumTests.class)
public class TestRSMobFileCleanerChore {
private static final Logger LOG = LoggerFactory.getLogger(TestRSMobFileCleanerChore.class);
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestRSMobFileCleanerChore.class);
private HBaseTestingUtility HTU;
private final static String famStr = "f1";
private final static byte[] fam = Bytes.toBytes(famStr);
private final static byte[] qualifier = Bytes.toBytes("q1");
private final static long mobLen = 10;
private final static byte[] mobVal = Bytes
.toBytes("01234567890123456789012345678901234567890123456789012345678901234567890123456789");
private Configuration conf;
private TableDescriptorBuilder.ModifyableTableDescriptor tableDescriptor;
private ColumnFamilyDescriptor familyDescriptor;
private Admin admin;
private Table table = null;
private RSMobFileCleanerChore chore;
private long minAgeToArchive = 10000;
public TestRSMobFileCleanerChore() {
}
@Before
public void setUp() throws Exception {
HTU = new HBaseTestingUtility();
conf = HTU.getConfiguration();
initConf();
HTU.startMiniCluster();
admin = HTU.getAdmin();
familyDescriptor = ColumnFamilyDescriptorBuilder.newBuilder(fam).setMobEnabled(true)
.setMobThreshold(mobLen).setMaxVersions(1).build();
tableDescriptor =
HTU.createModifyableTableDescriptor("testMobCompactTable").setColumnFamily(familyDescriptor);
table = HTU.createTable(tableDescriptor, Bytes.toByteArrays("1"));
}
private void initConf() {
conf.setInt("hfile.format.version", 3);
conf.setLong(TimeToLiveHFileCleaner.TTL_CONF_KEY, 0);
conf.setInt("hbase.client.retries.number", 100);
conf.setInt("hbase.hregion.max.filesize", 200000000);
conf.setInt("hbase.hregion.memstore.flush.size", 800000);
conf.setInt("hbase.hstore.blockingStoreFiles", 150);
conf.setInt("hbase.hstore.compaction.throughput.lower.bound", 52428800);
conf.setInt("hbase.hstore.compaction.throughput.higher.bound", 2 * 52428800);
// conf.set(MobStoreEngine.DEFAULT_MOB_COMPACTOR_CLASS_KEY,
// FaultyMobStoreCompactor.class.getName());
// Disable automatic MOB compaction
conf.setLong(MobConstants.MOB_COMPACTION_CHORE_PERIOD, 0);
// Disable automatic MOB file cleaner chore
conf.setLong(MobConstants.MOB_CLEANER_PERIOD, 0);
// Set minimum age to archive to 10 sec
conf.setLong(MobConstants.MIN_AGE_TO_ARCHIVE_KEY, minAgeToArchive);
// Set compacted file discharger interval to a half minAgeToArchive
conf.setLong("hbase.hfile.compaction.discharger.interval", minAgeToArchive / 2);
}
private void loadData(int start, int num) {
try {
for (int i = 0; i < num; i++) {
Put p = new Put(Bytes.toBytes(start + i));
p.addColumn(fam, qualifier, mobVal);
table.put(p);
}
admin.flush(table.getName());
} catch (Exception e) {
LOG.error("MOB file cleaner chore test FAILED", e);
assertTrue(false);
}
}
@After
public void tearDown() throws Exception {
admin.disableTable(tableDescriptor.getTableName());
admin.deleteTable(tableDescriptor.getTableName());
HTU.shutdownMiniCluster();
}
@Test
public void testMobFileCleanerChore() throws InterruptedException, IOException {
loadData(0, 10);
loadData(10, 10);
// loadData(20, 10);
long num = getNumberOfMobFiles(conf, table.getName(), new String(fam));
assertEquals(2, num);
// Major compact
admin.majorCompact(tableDescriptor.getTableName(), fam);
// wait until compaction is complete
while (admin.getCompactionState(tableDescriptor.getTableName()) != CompactionState.NONE) {
Thread.sleep(100);
}
num = getNumberOfMobFiles(conf, table.getName(), new String(fam));
assertEquals(3, num);
// We have guarantee, that compcated file discharger will run during this pause
// because it has interval less than this wait time
LOG.info("Waiting for {}ms", minAgeToArchive + 1000);
Thread.sleep(minAgeToArchive + 1000);
LOG.info("Cleaning up MOB files");
ServerName serverUsed = null;
List<RegionInfo> serverRegions = null;
for (ServerName sn : admin.getRegionServers()) {
serverRegions = admin.getRegions(sn);
if (serverRegions != null && serverRegions.size() > 0) {
// filtering out non test table regions
serverRegions = serverRegions.stream().filter(r -> r.getTable() == table.getName())
.collect(Collectors.toList());
// if such one is found use this rs
if (serverRegions.size() > 0) {
serverUsed = sn;
}
break;
}
}
chore = HTU.getMiniHBaseCluster().getRegionServer(serverUsed).getRSMobFileCleanerChore();
chore.chore();
num = getNumberOfMobFiles(conf, table.getName(), new String(fam));
assertEquals(3 - serverRegions.size(), num);
long scanned = scanTable();
assertEquals(20, scanned);
// creating a MOB file not referenced from the current RS
Path extraMOBFile = MobTestUtil.generateMOBFileForRegion(conf, table.getName(),
familyDescriptor, "nonExistentRegion");
// verifying the new MOBfile is added
num = getNumberOfMobFiles(conf, table.getName(), new String(fam));
assertEquals(4 - serverRegions.size(), num);
FileSystem fs = FileSystem.get(conf);
assertTrue(fs.exists(extraMOBFile));
LOG.info("Waiting for {}ms", minAgeToArchive + 1000);
Thread.sleep(minAgeToArchive + 1000);
LOG.info("Cleaning up MOB files");
// running chore again
chore.chore();
// the chore should only archive old MOB files that were referenced from the current RS
// the unrelated MOB file is still there
num = getNumberOfMobFiles(conf, table.getName(), new String(fam));
assertEquals(4 - serverRegions.size(), num);
assertTrue(fs.exists(extraMOBFile));
scanned = scanTable();
assertEquals(20, scanned);
}
private long getNumberOfMobFiles(Configuration conf, TableName tableName, String family)
throws IOException {
FileSystem fs = FileSystem.get(conf);
Path dir = MobUtils.getMobFamilyPath(conf, tableName, family);
FileStatus[] stat = fs.listStatus(dir);
for (FileStatus st : stat) {
LOG.debug("DDDD MOB Directory content: {} size={}", st.getPath(), st.getLen());
}
LOG.debug("MOB Directory content total files: {}", stat.length);
return stat.length;
}
private long scanTable() {
try {
Result result;
ResultScanner scanner = table.getScanner(fam);
long counter = 0;
while ((result = scanner.next()) != null) {
assertTrue(Arrays.equals(result.getValue(fam, qualifier), mobVal));
counter++;
}
return counter;
} catch (Exception e) {
e.printStackTrace();
LOG.error("MOB file cleaner chore test FAILED");
if (HTU != null) {
assertTrue(false);
} else {
System.exit(-1);
}
}
return 0;
}
}