HBASE-26969:Eliminate MOB renames when SFT is enabled
Signed-off-by: Wellington Chevreuil <wchevreuil@apache.org> Signed-off-by: Peter Somogyi <psomogyi@apache.org> closes #4712
This commit is contained in:
parent
1176b484d1
commit
d073404706
|
@ -294,20 +294,20 @@ public class DefaultMobStoreCompactor extends DefaultCompactor {
|
||||||
* @param cleanSeqId When true, remove seqId(used to be mvcc) value which is <=
|
* @param cleanSeqId When true, remove seqId(used to be mvcc) value which is <=
|
||||||
* smallestReadPoint
|
* smallestReadPoint
|
||||||
* @param throughputController The compaction throughput controller.
|
* @param throughputController The compaction throughput controller.
|
||||||
* @param major Is a major compaction.
|
* @param request compaction request.
|
||||||
* @param numofFilesToCompact the number of files to compact
|
|
||||||
* @param progress Progress reporter.
|
* @param progress Progress reporter.
|
||||||
* @return Whether compaction ended; false if it was interrupted for any reason.
|
* @return Whether compaction ended; false if it was interrupted for any reason.
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
protected boolean performCompaction(FileDetails fd, InternalScanner scanner, CellSink writer,
|
protected boolean performCompaction(FileDetails fd, InternalScanner scanner, CellSink writer,
|
||||||
long smallestReadPoint, boolean cleanSeqId, ThroughputController throughputController,
|
long smallestReadPoint, boolean cleanSeqId, ThroughputController throughputController,
|
||||||
boolean major, int numofFilesToCompact, CompactionProgress progress) throws IOException {
|
CompactionRequestImpl request, CompactionProgress progress) throws IOException {
|
||||||
long bytesWrittenProgressForLog = 0;
|
long bytesWrittenProgressForLog = 0;
|
||||||
long bytesWrittenProgressForShippedCall = 0;
|
long bytesWrittenProgressForShippedCall = 0;
|
||||||
// Clear old mob references
|
// Clear old mob references
|
||||||
mobRefSet.get().clear();
|
mobRefSet.get().clear();
|
||||||
boolean isUserRequest = userRequest.get();
|
boolean isUserRequest = userRequest.get();
|
||||||
|
boolean major = request.isAllFiles();
|
||||||
boolean compactMOBs = major && isUserRequest;
|
boolean compactMOBs = major && isUserRequest;
|
||||||
boolean discardMobMiss = conf.getBoolean(MobConstants.MOB_UNSAFE_DISCARD_MISS_KEY,
|
boolean discardMobMiss = conf.getBoolean(MobConstants.MOB_UNSAFE_DISCARD_MISS_KEY,
|
||||||
MobConstants.DEFAULT_MOB_DISCARD_MISS);
|
MobConstants.DEFAULT_MOB_DISCARD_MISS);
|
||||||
|
@ -351,12 +351,12 @@ public class DefaultMobStoreCompactor extends DefaultCompactor {
|
||||||
throughputController.start(compactionName);
|
throughputController.start(compactionName);
|
||||||
KeyValueScanner kvs = (scanner instanceof KeyValueScanner) ? (KeyValueScanner) scanner : null;
|
KeyValueScanner kvs = (scanner instanceof KeyValueScanner) ? (KeyValueScanner) scanner : null;
|
||||||
long shippedCallSizeLimit =
|
long shippedCallSizeLimit =
|
||||||
(long) numofFilesToCompact * this.store.getColumnFamilyDescriptor().getBlocksize();
|
(long) request.getFiles().size() * this.store.getColumnFamilyDescriptor().getBlocksize();
|
||||||
|
|
||||||
Cell mobCell = null;
|
Cell mobCell = null;
|
||||||
try {
|
try {
|
||||||
|
|
||||||
mobFileWriter = newMobWriter(fd, major);
|
mobFileWriter = newMobWriter(fd, major, request.getWriterCreationTracker());
|
||||||
fileName = Bytes.toBytes(mobFileWriter.getPath().getName());
|
fileName = Bytes.toBytes(mobFileWriter.getPath().getName());
|
||||||
|
|
||||||
do {
|
do {
|
||||||
|
@ -426,7 +426,7 @@ public class DefaultMobStoreCompactor extends DefaultCompactor {
|
||||||
LOG.debug("Closing output MOB File, length={} file={}, store={}", len,
|
LOG.debug("Closing output MOB File, length={} file={}, store={}", len,
|
||||||
mobFileWriter.getPath().getName(), getStoreInfo());
|
mobFileWriter.getPath().getName(), getStoreInfo());
|
||||||
commitOrAbortMobWriter(mobFileWriter, fd.maxSeqId, mobCells, major);
|
commitOrAbortMobWriter(mobFileWriter, fd.maxSeqId, mobCells, major);
|
||||||
mobFileWriter = newMobWriter(fd, major);
|
mobFileWriter = newMobWriter(fd, major, request.getWriterCreationTracker());
|
||||||
fileName = Bytes.toBytes(mobFileWriter.getPath().getName());
|
fileName = Bytes.toBytes(mobFileWriter.getPath().getName());
|
||||||
mobCells = 0;
|
mobCells = 0;
|
||||||
}
|
}
|
||||||
|
@ -470,7 +470,7 @@ public class DefaultMobStoreCompactor extends DefaultCompactor {
|
||||||
long len = mobFileWriter.getPos();
|
long len = mobFileWriter.getPos();
|
||||||
if (len > maxMobFileSize) {
|
if (len > maxMobFileSize) {
|
||||||
commitOrAbortMobWriter(mobFileWriter, fd.maxSeqId, mobCells, major);
|
commitOrAbortMobWriter(mobFileWriter, fd.maxSeqId, mobCells, major);
|
||||||
mobFileWriter = newMobWriter(fd, major);
|
mobFileWriter = newMobWriter(fd, major, request.getWriterCreationTracker());
|
||||||
fileName = Bytes.toBytes(mobFileWriter.getPath().getName());
|
fileName = Bytes.toBytes(mobFileWriter.getPath().getName());
|
||||||
mobCells = 0;
|
mobCells = 0;
|
||||||
}
|
}
|
||||||
|
@ -522,7 +522,7 @@ public class DefaultMobStoreCompactor extends DefaultCompactor {
|
||||||
long len = mobFileWriter.getPos();
|
long len = mobFileWriter.getPos();
|
||||||
if (len > maxMobFileSize) {
|
if (len > maxMobFileSize) {
|
||||||
commitOrAbortMobWriter(mobFileWriter, fd.maxSeqId, mobCells, major);
|
commitOrAbortMobWriter(mobFileWriter, fd.maxSeqId, mobCells, major);
|
||||||
mobFileWriter = newMobWriter(fd, major);
|
mobFileWriter = newMobWriter(fd, major, request.getWriterCreationTracker());
|
||||||
fileName = Bytes.toBytes(mobFileWriter.getPath().getName());
|
fileName = Bytes.toBytes(mobFileWriter.getPath().getName());
|
||||||
mobCells = 0;
|
mobCells = 0;
|
||||||
}
|
}
|
||||||
|
@ -608,11 +608,16 @@ public class DefaultMobStoreCompactor extends DefaultCompactor {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private StoreFileWriter newMobWriter(FileDetails fd, boolean major) throws IOException {
|
private StoreFileWriter newMobWriter(FileDetails fd, boolean major,
|
||||||
|
Consumer<Path> writerCreationTracker) throws IOException {
|
||||||
try {
|
try {
|
||||||
StoreFileWriter mobFileWriter = mobStore.createWriterInTmp(new Date(fd.latestPutTs),
|
StoreFileWriter mobFileWriter = mobStore.getStoreEngine().requireWritingToTmpDirFirst()
|
||||||
fd.maxKeyCount, major ? majorCompactionCompression : minorCompactionCompression,
|
? mobStore.createWriterInTmp(new Date(fd.latestPutTs), fd.maxKeyCount,
|
||||||
store.getRegionInfo().getStartKey(), true);
|
major ? majorCompactionCompression : minorCompactionCompression,
|
||||||
|
store.getRegionInfo().getStartKey(), true)
|
||||||
|
: mobStore.createWriter(new Date(fd.latestPutTs), fd.maxKeyCount,
|
||||||
|
major ? majorCompactionCompression : minorCompactionCompression,
|
||||||
|
store.getRegionInfo().getStartKey(), true, writerCreationTracker);
|
||||||
LOG.debug("New MOB writer created={} store={}", mobFileWriter.getPath().getName(),
|
LOG.debug("New MOB writer created={} store={}", mobFileWriter.getPath().getName(),
|
||||||
getStoreInfo());
|
getStoreInfo());
|
||||||
// Add reference we get for compact MOB
|
// Add reference we get for compact MOB
|
||||||
|
|
|
@ -127,7 +127,8 @@ public class DefaultMobStoreFlusher extends DefaultStoreFlusher {
|
||||||
try {
|
try {
|
||||||
// It's a mob store, flush the cells in a mob way. This is the difference of flushing
|
// It's a mob store, flush the cells in a mob way. This is the difference of flushing
|
||||||
// between a normal and a mob store.
|
// between a normal and a mob store.
|
||||||
performMobFlush(snapshot, cacheFlushId, scanner, writer, status, throughputController);
|
performMobFlush(snapshot, cacheFlushId, scanner, writer, status, throughputController,
|
||||||
|
writerCreationTracker);
|
||||||
} catch (IOException ioe) {
|
} catch (IOException ioe) {
|
||||||
e = ioe;
|
e = ioe;
|
||||||
// throw the exception out
|
// throw the exception out
|
||||||
|
@ -171,16 +172,21 @@ public class DefaultMobStoreFlusher extends DefaultStoreFlusher {
|
||||||
*/
|
*/
|
||||||
protected void performMobFlush(MemStoreSnapshot snapshot, long cacheFlushId,
|
protected void performMobFlush(MemStoreSnapshot snapshot, long cacheFlushId,
|
||||||
InternalScanner scanner, StoreFileWriter writer, MonitoredTask status,
|
InternalScanner scanner, StoreFileWriter writer, MonitoredTask status,
|
||||||
ThroughputController throughputController) throws IOException {
|
ThroughputController throughputController, Consumer<Path> writerCreationTracker)
|
||||||
|
throws IOException {
|
||||||
StoreFileWriter mobFileWriter = null;
|
StoreFileWriter mobFileWriter = null;
|
||||||
int compactionKVMax =
|
int compactionKVMax =
|
||||||
conf.getInt(HConstants.COMPACTION_KV_MAX, HConstants.COMPACTION_KV_MAX_DEFAULT);
|
conf.getInt(HConstants.COMPACTION_KV_MAX, HConstants.COMPACTION_KV_MAX_DEFAULT);
|
||||||
long mobCount = 0;
|
long mobCount = 0;
|
||||||
long mobSize = 0;
|
long mobSize = 0;
|
||||||
long time = snapshot.getTimeRangeTracker().getMax();
|
long time = snapshot.getTimeRangeTracker().getMax();
|
||||||
mobFileWriter = mobStore.createWriterInTmp(new Date(time), snapshot.getCellsCount(),
|
mobFileWriter = mobStore.getStoreEngine().requireWritingToTmpDirFirst()
|
||||||
store.getColumnFamilyDescriptor().getCompressionType(), store.getRegionInfo().getStartKey(),
|
? mobStore.createWriterInTmp(new Date(time), snapshot.getCellsCount(),
|
||||||
false);
|
store.getColumnFamilyDescriptor().getCompressionType(), store.getRegionInfo().getStartKey(),
|
||||||
|
false)
|
||||||
|
: mobStore.createWriter(new Date(time), snapshot.getCellsCount(),
|
||||||
|
store.getColumnFamilyDescriptor().getCompressionType(), store.getRegionInfo().getStartKey(),
|
||||||
|
false, writerCreationTracker);
|
||||||
// the target path is {tableName}/.mob/{cfName}/mobFiles
|
// the target path is {tableName}/.mob/{cfName}/mobFiles
|
||||||
// the relative path is mobFiles
|
// the relative path is mobFiles
|
||||||
byte[] fileName = Bytes.toBytes(mobFileWriter.getPath().getName());
|
byte[] fileName = Bytes.toBytes(mobFileWriter.getPath().getName());
|
||||||
|
|
|
@ -168,14 +168,15 @@ public class MobFileCleanerChore extends ScheduledChore {
|
||||||
maxCreationTimeToArchive, table);
|
maxCreationTimeToArchive, table);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
FileSystem fs = FileSystem.get(conf);
|
||||||
|
Set<String> regionNames = new HashSet<>();
|
||||||
Path rootDir = CommonFSUtils.getRootDir(conf);
|
Path rootDir = CommonFSUtils.getRootDir(conf);
|
||||||
Path tableDir = CommonFSUtils.getTableDir(rootDir, table);
|
Path tableDir = CommonFSUtils.getTableDir(rootDir, table);
|
||||||
// How safe is this call?
|
List<Path> regionDirs = FSUtils.getRegionDirs(fs, tableDir);
|
||||||
List<Path> regionDirs = FSUtils.getRegionDirs(FileSystem.get(conf), tableDir);
|
|
||||||
|
|
||||||
Set<String> allActiveMobFileName = new HashSet<String>();
|
Set<String> allActiveMobFileName = new HashSet<String>();
|
||||||
FileSystem fs = FileSystem.get(conf);
|
|
||||||
for (Path regionPath : regionDirs) {
|
for (Path regionPath : regionDirs) {
|
||||||
|
regionNames.add(regionPath.getName());
|
||||||
for (ColumnFamilyDescriptor hcd : list) {
|
for (ColumnFamilyDescriptor hcd : list) {
|
||||||
String family = hcd.getNameAsString();
|
String family = hcd.getNameAsString();
|
||||||
Path storePath = new Path(regionPath, family);
|
Path storePath = new Path(regionPath, family);
|
||||||
|
@ -203,13 +204,26 @@ public class MobFileCleanerChore extends ScheduledChore {
|
||||||
for (Path pp : storeFiles) {
|
for (Path pp : storeFiles) {
|
||||||
currentPath = pp;
|
currentPath = pp;
|
||||||
LOG.trace("Store file: {}", pp);
|
LOG.trace("Store file: {}", pp);
|
||||||
HStoreFile sf =
|
HStoreFile sf = null;
|
||||||
new HStoreFile(fs, pp, conf, CacheConfig.DISABLED, BloomType.NONE, true);
|
byte[] mobRefData = null;
|
||||||
sf.initReader();
|
byte[] bulkloadMarkerData = null;
|
||||||
byte[] mobRefData = sf.getMetadataValue(HStoreFile.MOB_FILE_REFS);
|
try {
|
||||||
byte[] bulkloadMarkerData = sf.getMetadataValue(HStoreFile.BULKLOAD_TASK_KEY);
|
sf = new HStoreFile(fs, pp, conf, CacheConfig.DISABLED, BloomType.NONE, true);
|
||||||
// close store file to avoid memory leaks
|
sf.initReader();
|
||||||
sf.closeStoreFile(true);
|
mobRefData = sf.getMetadataValue(HStoreFile.MOB_FILE_REFS);
|
||||||
|
bulkloadMarkerData = sf.getMetadataValue(HStoreFile.BULKLOAD_TASK_KEY);
|
||||||
|
// close store file to avoid memory leaks
|
||||||
|
sf.closeStoreFile(true);
|
||||||
|
} catch (IOException ex) {
|
||||||
|
// When FileBased SFT is active the store dir can contain corrupted or incomplete
|
||||||
|
// files. So read errors are expected. We just skip these files.
|
||||||
|
if (ex instanceof FileNotFoundException) {
|
||||||
|
throw ex;
|
||||||
|
}
|
||||||
|
LOG.debug("Failed to get mob data from file: {} due to error.", pp.toString(),
|
||||||
|
ex);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
if (mobRefData == null) {
|
if (mobRefData == null) {
|
||||||
if (bulkloadMarkerData == null) {
|
if (bulkloadMarkerData == null) {
|
||||||
LOG.warn("Found old store file with no MOB_FILE_REFS: {} - "
|
LOG.warn("Found old store file with no MOB_FILE_REFS: {} - "
|
||||||
|
@ -264,9 +278,11 @@ public class MobFileCleanerChore extends ScheduledChore {
|
||||||
while (rit.hasNext()) {
|
while (rit.hasNext()) {
|
||||||
LocatedFileStatus lfs = rit.next();
|
LocatedFileStatus lfs = rit.next();
|
||||||
Path p = lfs.getPath();
|
Path p = lfs.getPath();
|
||||||
if (!allActiveMobFileName.contains(p.getName())) {
|
String[] mobParts = p.getName().split("_");
|
||||||
// MOB is not in a list of active references, but it can be too
|
String regionName = mobParts[mobParts.length - 1];
|
||||||
// fresh, skip it in this case
|
|
||||||
|
if (!regionNames.contains(regionName)) {
|
||||||
|
// MOB belonged to a region no longer hosted
|
||||||
long creationTime = fs.getFileStatus(p).getModificationTime();
|
long creationTime = fs.getFileStatus(p).getModificationTime();
|
||||||
if (creationTime < maxCreationTimeToArchive) {
|
if (creationTime < maxCreationTimeToArchive) {
|
||||||
LOG.trace("Archiving MOB file {} creation time={}", p,
|
LOG.trace("Archiving MOB file {} creation time={}", p,
|
||||||
|
@ -277,7 +293,7 @@ public class MobFileCleanerChore extends ScheduledChore {
|
||||||
fs.getFileStatus(p).getModificationTime());
|
fs.getFileStatus(p).getModificationTime());
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
LOG.trace("Keeping active MOB file: {}", p);
|
LOG.trace("Keeping MOB file with existing region: {}", p);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
LOG.info(" MOB Cleaner found {} files to archive for table={} family={}", toArchive.size(),
|
LOG.info(" MOB Cleaner found {} files to archive for table={} family={}", toArchive.size(),
|
||||||
|
|
|
@ -33,6 +33,7 @@ import java.util.Date;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Optional;
|
import java.util.Optional;
|
||||||
import java.util.UUID;
|
import java.util.UUID;
|
||||||
|
import java.util.function.Consumer;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.FileStatus;
|
import org.apache.hadoop.fs.FileStatus;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
@ -584,6 +585,33 @@ public final class MobUtils {
|
||||||
CacheConfig cacheConfig, Encryption.Context cryptoContext, ChecksumType checksumType,
|
CacheConfig cacheConfig, Encryption.Context cryptoContext, ChecksumType checksumType,
|
||||||
int bytesPerChecksum, int blocksize, BloomType bloomType, boolean isCompaction)
|
int bytesPerChecksum, int blocksize, BloomType bloomType, boolean isCompaction)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
|
return createWriter(conf, fs, family, path, maxKeyCount, compression, cacheConfig,
|
||||||
|
cryptoContext, checksumType, bytesPerChecksum, blocksize, bloomType, isCompaction, null);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Creates a writer for the mob file in temp directory.
|
||||||
|
* @param conf The current configuration.
|
||||||
|
* @param fs The current file system.
|
||||||
|
* @param family The descriptor of the current column family.
|
||||||
|
* @param path The path for a temp directory.
|
||||||
|
* @param maxKeyCount The key count.
|
||||||
|
* @param compression The compression algorithm.
|
||||||
|
* @param cacheConfig The current cache config.
|
||||||
|
* @param cryptoContext The encryption context.
|
||||||
|
* @param checksumType The checksum type.
|
||||||
|
* @param bytesPerChecksum The bytes per checksum.
|
||||||
|
* @param blocksize The HFile block size.
|
||||||
|
* @param bloomType The bloom filter type.
|
||||||
|
* @param isCompaction If the writer is used in compaction.
|
||||||
|
* @param writerCreationTracker to track the current writer in the store
|
||||||
|
* @return The writer for the mob file.
|
||||||
|
*/
|
||||||
|
public static StoreFileWriter createWriter(Configuration conf, FileSystem fs,
|
||||||
|
ColumnFamilyDescriptor family, Path path, long maxKeyCount, Compression.Algorithm compression,
|
||||||
|
CacheConfig cacheConfig, Encryption.Context cryptoContext, ChecksumType checksumType,
|
||||||
|
int bytesPerChecksum, int blocksize, BloomType bloomType, boolean isCompaction,
|
||||||
|
Consumer<Path> writerCreationTracker) throws IOException {
|
||||||
if (compression == null) {
|
if (compression == null) {
|
||||||
compression = HFile.DEFAULT_COMPRESSION_ALGORITHM;
|
compression = HFile.DEFAULT_COMPRESSION_ALGORITHM;
|
||||||
}
|
}
|
||||||
|
@ -602,7 +630,8 @@ public final class MobUtils {
|
||||||
.withCreateTime(EnvironmentEdgeManager.currentTime()).build();
|
.withCreateTime(EnvironmentEdgeManager.currentTime()).build();
|
||||||
|
|
||||||
StoreFileWriter w = new StoreFileWriter.Builder(conf, writerCacheConf, fs).withFilePath(path)
|
StoreFileWriter w = new StoreFileWriter.Builder(conf, writerCacheConf, fs).withFilePath(path)
|
||||||
.withBloomType(bloomType).withMaxKeyCount(maxKeyCount).withFileContext(hFileContext).build();
|
.withBloomType(bloomType).withMaxKeyCount(maxKeyCount).withFileContext(hFileContext)
|
||||||
|
.withWriterCreationTracker(writerCreationTracker).build();
|
||||||
return w;
|
return w;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,272 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.mob;
|
||||||
|
|
||||||
|
import java.io.FileNotFoundException;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Collection;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.HashSet;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Set;
|
||||||
|
import java.util.concurrent.ThreadLocalRandom;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
import org.apache.hadoop.fs.LocatedFileStatus;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.fs.RemoteIterator;
|
||||||
|
import org.apache.hadoop.hbase.ScheduledChore;
|
||||||
|
import org.apache.hadoop.hbase.TableDescriptors;
|
||||||
|
import org.apache.hadoop.hbase.TableName;
|
||||||
|
import org.apache.hadoop.hbase.backup.HFileArchiver;
|
||||||
|
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
|
||||||
|
import org.apache.hadoop.hbase.client.TableDescriptor;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.HRegion;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.HRegionServer;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.HStore;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.HStoreFile;
|
||||||
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
import org.apache.hadoop.hbase.util.CommonFSUtils;
|
||||||
|
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||||
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import org.apache.hbase.thirdparty.com.google.common.collect.SetMultimap;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The class RSMobFileCleanerChore for running cleaner regularly to remove the obsolete (files which
|
||||||
|
* have no active references to) mob files that were referenced from the current RS.
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
public class RSMobFileCleanerChore extends ScheduledChore {
|
||||||
|
|
||||||
|
private static final Logger LOG = LoggerFactory.getLogger(RSMobFileCleanerChore.class);
|
||||||
|
private final HRegionServer rs;
|
||||||
|
|
||||||
|
public RSMobFileCleanerChore(HRegionServer rs) {
|
||||||
|
super(rs.getServerName() + "-MobFileCleanerChore", rs,
|
||||||
|
rs.getConfiguration().getInt(MobConstants.MOB_CLEANER_PERIOD,
|
||||||
|
MobConstants.DEFAULT_MOB_CLEANER_PERIOD),
|
||||||
|
Math.round(rs.getConfiguration().getInt(MobConstants.MOB_CLEANER_PERIOD,
|
||||||
|
MobConstants.DEFAULT_MOB_CLEANER_PERIOD)
|
||||||
|
* ((ThreadLocalRandom.current().nextDouble() + 0.5D))),
|
||||||
|
TimeUnit.SECONDS);
|
||||||
|
// to prevent a load spike on the fs the initial delay is modified by +/- 50%
|
||||||
|
this.rs = rs;
|
||||||
|
}
|
||||||
|
|
||||||
|
public RSMobFileCleanerChore() {
|
||||||
|
this.rs = null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void chore() {
|
||||||
|
|
||||||
|
long minAgeToArchive = rs.getConfiguration().getLong(MobConstants.MIN_AGE_TO_ARCHIVE_KEY,
|
||||||
|
MobConstants.DEFAULT_MIN_AGE_TO_ARCHIVE);
|
||||||
|
// We check only those MOB files, which creation time is less
|
||||||
|
// than maxCreationTimeToArchive. This is a current time - 1h. 1 hour gap
|
||||||
|
// gives us full confidence that all corresponding store files will
|
||||||
|
// exist at the time cleaning procedure begins and will be examined.
|
||||||
|
// So, if MOB file creation time is greater than this maxTimeToArchive,
|
||||||
|
// this will be skipped and won't be archived.
|
||||||
|
long maxCreationTimeToArchive = EnvironmentEdgeManager.currentTime() - minAgeToArchive;
|
||||||
|
|
||||||
|
TableDescriptors htds = rs.getTableDescriptors();
|
||||||
|
try {
|
||||||
|
FileSystem fs = FileSystem.get(rs.getConfiguration());
|
||||||
|
|
||||||
|
Map<String, TableDescriptor> map = null;
|
||||||
|
try {
|
||||||
|
map = htds.getAll();
|
||||||
|
} catch (IOException e) {
|
||||||
|
LOG.error("MobFileCleanerChore failed", e);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
Map<String, Map<String, List<String>>> referencedMOBs = new HashMap<>();
|
||||||
|
for (TableDescriptor htd : map.values()) {
|
||||||
|
// Now clean obsolete files for a table
|
||||||
|
LOG.info("Cleaning obsolete MOB files from table={}", htd.getTableName());
|
||||||
|
List<ColumnFamilyDescriptor> list = MobUtils.getMobColumnFamilies(htd);
|
||||||
|
List<HRegion> regions = rs.getRegions(htd.getTableName());
|
||||||
|
for (HRegion region : regions) {
|
||||||
|
for (ColumnFamilyDescriptor hcd : list) {
|
||||||
|
HStore store = region.getStore(hcd.getName());
|
||||||
|
Collection<HStoreFile> sfs = store.getStorefiles();
|
||||||
|
Set<String> regionMobs = new HashSet<String>();
|
||||||
|
Path currentPath = null;
|
||||||
|
try {
|
||||||
|
// collectinng referenced MOBs
|
||||||
|
for (HStoreFile sf : sfs) {
|
||||||
|
currentPath = sf.getPath();
|
||||||
|
sf.initReader();
|
||||||
|
byte[] mobRefData = sf.getMetadataValue(HStoreFile.MOB_FILE_REFS);
|
||||||
|
byte[] bulkloadMarkerData = sf.getMetadataValue(HStoreFile.BULKLOAD_TASK_KEY);
|
||||||
|
// close store file to avoid memory leaks
|
||||||
|
sf.closeStoreFile(true);
|
||||||
|
if (mobRefData == null) {
|
||||||
|
if (bulkloadMarkerData == null) {
|
||||||
|
LOG.warn(
|
||||||
|
"Found old store file with no MOB_FILE_REFS: {} - "
|
||||||
|
+ "can not proceed until all old files will be MOB-compacted.",
|
||||||
|
currentPath);
|
||||||
|
return;
|
||||||
|
} else {
|
||||||
|
LOG.debug("Skipping file without MOB references (bulkloaded file):{}",
|
||||||
|
currentPath);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// file may or may not have MOB references, but was created by the distributed
|
||||||
|
// mob compaction code.
|
||||||
|
try {
|
||||||
|
SetMultimap<TableName, String> mobs =
|
||||||
|
MobUtils.deserializeMobFileRefs(mobRefData).build();
|
||||||
|
LOG.debug("Found {} mob references for store={}", mobs.size(), sf);
|
||||||
|
LOG.trace("Specific mob references found for store={} : {}", sf, mobs);
|
||||||
|
regionMobs.addAll(mobs.values());
|
||||||
|
} catch (RuntimeException exception) {
|
||||||
|
throw new IOException("failure getting mob references for hfile " + sf,
|
||||||
|
exception);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// collecting files, MOB included currently being written
|
||||||
|
regionMobs.addAll(store.getStoreFilesBeingWritten().stream()
|
||||||
|
.map(path -> path.getName()).collect(Collectors.toList()));
|
||||||
|
|
||||||
|
referencedMOBs
|
||||||
|
.computeIfAbsent(hcd.getNameAsString(), cf -> new HashMap<String, List<String>>())
|
||||||
|
.computeIfAbsent(region.getRegionInfo().getEncodedName(), name -> new ArrayList<>())
|
||||||
|
.addAll(regionMobs);
|
||||||
|
|
||||||
|
} catch (FileNotFoundException e) {
|
||||||
|
LOG.warn(
|
||||||
|
"Missing file:{} Starting MOB cleaning cycle from the beginning" + " due to error",
|
||||||
|
currentPath, e);
|
||||||
|
regionMobs.clear();
|
||||||
|
continue;
|
||||||
|
} catch (IOException e) {
|
||||||
|
LOG.error("Failed to clean the obsolete mob files for table={}",
|
||||||
|
htd.getTableName().getNameAsString(), e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
|
LOG.debug("Found: {} active mob refs for table={}",
|
||||||
|
referencedMOBs.values().stream().map(inner -> inner.values())
|
||||||
|
.flatMap(lists -> lists.stream()).mapToInt(lists -> lists.size()).sum(),
|
||||||
|
htd.getTableName().getNameAsString());
|
||||||
|
}
|
||||||
|
if (LOG.isTraceEnabled()) {
|
||||||
|
referencedMOBs.values().stream().forEach(innerMap -> innerMap.values().stream()
|
||||||
|
.forEach(mobFileList -> mobFileList.stream().forEach(LOG::trace)));
|
||||||
|
}
|
||||||
|
|
||||||
|
// collect regions referencing MOB files belonging to the current rs
|
||||||
|
Set<String> regionsCovered = new HashSet<>();
|
||||||
|
referencedMOBs.values().stream()
|
||||||
|
.forEach(regionMap -> regionsCovered.addAll(regionMap.keySet()));
|
||||||
|
|
||||||
|
for (ColumnFamilyDescriptor hcd : list) {
|
||||||
|
List<Path> toArchive = new ArrayList<Path>();
|
||||||
|
String family = hcd.getNameAsString();
|
||||||
|
Path dir = MobUtils.getMobFamilyPath(rs.getConfiguration(), htd.getTableName(), family);
|
||||||
|
RemoteIterator<LocatedFileStatus> rit = fs.listLocatedStatus(dir);
|
||||||
|
while (rit.hasNext()) {
|
||||||
|
LocatedFileStatus lfs = rit.next();
|
||||||
|
Path p = lfs.getPath();
|
||||||
|
String[] mobParts = p.getName().split("_");
|
||||||
|
String regionName = mobParts[mobParts.length - 1];
|
||||||
|
|
||||||
|
// skip MOB files not belonging to a region assigned to the current rs
|
||||||
|
if (!regionsCovered.contains(regionName)) {
|
||||||
|
LOG.trace("MOB file does not belong to current rs: {}", p);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
// check active or actively written mob files
|
||||||
|
Map<String, List<String>> cfMobs = referencedMOBs.get(hcd.getNameAsString());
|
||||||
|
if (
|
||||||
|
cfMobs != null && cfMobs.get(regionName) != null
|
||||||
|
&& cfMobs.get(regionName).contains(p.getName())
|
||||||
|
) {
|
||||||
|
LOG.trace("Keeping active MOB file: {}", p);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
// MOB is not in a list of active references, but it can be too
|
||||||
|
// fresh, skip it in this case
|
||||||
|
long creationTime = fs.getFileStatus(p).getModificationTime();
|
||||||
|
if (creationTime < maxCreationTimeToArchive) {
|
||||||
|
LOG.trace("Archiving MOB file {} creation time={}", p,
|
||||||
|
(fs.getFileStatus(p).getModificationTime()));
|
||||||
|
toArchive.add(p);
|
||||||
|
} else {
|
||||||
|
LOG.trace("Skipping fresh file: {}. Creation time={}", p,
|
||||||
|
fs.getFileStatus(p).getModificationTime());
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
LOG.info(" MOB Cleaner found {} files to archive for table={} family={}",
|
||||||
|
toArchive.size(), htd.getTableName().getNameAsString(), family);
|
||||||
|
archiveMobFiles(rs.getConfiguration(), htd.getTableName(), family.getBytes(), toArchive);
|
||||||
|
LOG.info(" MOB Cleaner archived {} files, table={} family={}", toArchive.size(),
|
||||||
|
htd.getTableName().getNameAsString(), family);
|
||||||
|
}
|
||||||
|
|
||||||
|
LOG.info("Cleaning obsolete MOB files finished for table={}", htd.getTableName());
|
||||||
|
|
||||||
|
}
|
||||||
|
} catch (IOException e) {
|
||||||
|
LOG.error("MOB Cleaner failed when trying to access the file system", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Archives the mob files.
|
||||||
|
* @param conf The current configuration.
|
||||||
|
* @param tableName The table name.
|
||||||
|
* @param family The name of the column family.
|
||||||
|
* @param storeFiles The files to be archived.
|
||||||
|
* @throws IOException exception
|
||||||
|
*/
|
||||||
|
public void archiveMobFiles(Configuration conf, TableName tableName, byte[] family,
|
||||||
|
List<Path> storeFiles) throws IOException {
|
||||||
|
|
||||||
|
if (storeFiles.size() == 0) {
|
||||||
|
// nothing to remove
|
||||||
|
LOG.debug("Skipping archiving old MOB files - no files found for table={} cf={}", tableName,
|
||||||
|
Bytes.toString(family));
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
Path mobTableDir = CommonFSUtils.getTableDir(MobUtils.getMobHome(conf), tableName);
|
||||||
|
FileSystem fs = storeFiles.get(0).getFileSystem(conf);
|
||||||
|
|
||||||
|
for (Path p : storeFiles) {
|
||||||
|
LOG.debug("MOB Cleaner is archiving: {}", p);
|
||||||
|
HFileArchiver.archiveStoreFile(conf, fs, MobUtils.getMobRegionInfo(tableName), mobTableDir,
|
||||||
|
family, p);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -28,6 +28,7 @@ import java.util.Optional;
|
||||||
import java.util.UUID;
|
import java.util.UUID;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
import java.util.concurrent.atomic.AtomicLong;
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
|
import java.util.function.Consumer;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
|
@ -184,7 +185,27 @@ public class HMobStore extends HStore {
|
||||||
}
|
}
|
||||||
Path path = getTempDir();
|
Path path = getTempDir();
|
||||||
return createWriterInTmp(MobUtils.formatDate(date), path, maxKeyCount, compression, startKey,
|
return createWriterInTmp(MobUtils.formatDate(date), path, maxKeyCount, compression, startKey,
|
||||||
isCompaction);
|
isCompaction, null);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Creates the writer for the mob file in the mob family directory.
|
||||||
|
* @param date The latest date of written cells.
|
||||||
|
* @param maxKeyCount The key count.
|
||||||
|
* @param compression The compression algorithm.
|
||||||
|
* @param startKey The start key.
|
||||||
|
* @param isCompaction If the writer is used in compaction.
|
||||||
|
* @return The writer for the mob file. n
|
||||||
|
*/
|
||||||
|
public StoreFileWriter createWriter(Date date, long maxKeyCount,
|
||||||
|
Compression.Algorithm compression, byte[] startKey, boolean isCompaction,
|
||||||
|
Consumer<Path> writerCreationTracker) throws IOException {
|
||||||
|
if (startKey == null) {
|
||||||
|
startKey = HConstants.EMPTY_START_ROW;
|
||||||
|
}
|
||||||
|
Path path = getPath();
|
||||||
|
return createWriterInTmp(MobUtils.formatDate(date), path, maxKeyCount, compression, startKey,
|
||||||
|
isCompaction, writerCreationTracker);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -198,11 +219,13 @@ public class HMobStore extends HStore {
|
||||||
* @return The writer for the mob file. n
|
* @return The writer for the mob file. n
|
||||||
*/
|
*/
|
||||||
public StoreFileWriter createWriterInTmp(String date, Path basePath, long maxKeyCount,
|
public StoreFileWriter createWriterInTmp(String date, Path basePath, long maxKeyCount,
|
||||||
Compression.Algorithm compression, byte[] startKey, boolean isCompaction) throws IOException {
|
Compression.Algorithm compression, byte[] startKey, boolean isCompaction,
|
||||||
|
Consumer<Path> writerCreationTracker) throws IOException {
|
||||||
MobFileName mobFileName =
|
MobFileName mobFileName =
|
||||||
MobFileName.create(startKey, date, UUID.randomUUID().toString().replaceAll("-", ""),
|
MobFileName.create(startKey, date, UUID.randomUUID().toString().replaceAll("-", ""),
|
||||||
getHRegion().getRegionInfo().getEncodedName());
|
getHRegion().getRegionInfo().getEncodedName());
|
||||||
return createWriterInTmp(mobFileName, basePath, maxKeyCount, compression, isCompaction);
|
return createWriterInTmp(mobFileName, basePath, maxKeyCount, compression, isCompaction,
|
||||||
|
writerCreationTracker);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -214,13 +237,15 @@ public class HMobStore extends HStore {
|
||||||
* @param isCompaction If the writer is used in compaction.
|
* @param isCompaction If the writer is used in compaction.
|
||||||
* @return The writer for the mob file. n
|
* @return The writer for the mob file. n
|
||||||
*/
|
*/
|
||||||
|
|
||||||
public StoreFileWriter createWriterInTmp(MobFileName mobFileName, Path basePath, long maxKeyCount,
|
public StoreFileWriter createWriterInTmp(MobFileName mobFileName, Path basePath, long maxKeyCount,
|
||||||
Compression.Algorithm compression, boolean isCompaction) throws IOException {
|
Compression.Algorithm compression, boolean isCompaction, Consumer<Path> writerCreationTracker)
|
||||||
|
throws IOException {
|
||||||
return MobUtils.createWriter(conf, getFileSystem(), getColumnFamilyDescriptor(),
|
return MobUtils.createWriter(conf, getFileSystem(), getColumnFamilyDescriptor(),
|
||||||
new Path(basePath, mobFileName.getFileName()), maxKeyCount, compression, getCacheConfig(),
|
new Path(basePath, mobFileName.getFileName()), maxKeyCount, compression, getCacheConfig(),
|
||||||
getStoreContext().getEncryptionContext(), StoreUtils.getChecksumType(conf),
|
getStoreContext().getEncryptionContext(), StoreUtils.getChecksumType(conf),
|
||||||
StoreUtils.getBytesPerChecksum(conf), getStoreContext().getBlockSize(), BloomType.NONE,
|
StoreUtils.getBytesPerChecksum(conf), getStoreContext().getBlockSize(), BloomType.NONE,
|
||||||
isCompaction);
|
isCompaction, writerCreationTracker);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -234,6 +259,10 @@ public class HMobStore extends HStore {
|
||||||
}
|
}
|
||||||
Path dstPath = new Path(targetPath, sourceFile.getName());
|
Path dstPath = new Path(targetPath, sourceFile.getName());
|
||||||
validateMobFile(sourceFile);
|
validateMobFile(sourceFile);
|
||||||
|
if (sourceFile.equals(targetPath)) {
|
||||||
|
LOG.info("File is already in the destination dir: {}", sourceFile);
|
||||||
|
return;
|
||||||
|
}
|
||||||
LOG.info(" FLUSH Renaming flushed file from {} to {}", sourceFile, dstPath);
|
LOG.info(" FLUSH Renaming flushed file from {} to {}", sourceFile, dstPath);
|
||||||
Path parent = dstPath.getParent();
|
Path parent = dstPath.getParent();
|
||||||
if (!getFileSystem().exists(parent)) {
|
if (!getFileSystem().exists(parent)) {
|
||||||
|
|
|
@ -131,6 +131,7 @@ import org.apache.hadoop.hbase.master.HMaster;
|
||||||
import org.apache.hadoop.hbase.master.LoadBalancer;
|
import org.apache.hadoop.hbase.master.LoadBalancer;
|
||||||
import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer;
|
import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer;
|
||||||
import org.apache.hadoop.hbase.mob.MobFileCache;
|
import org.apache.hadoop.hbase.mob.MobFileCache;
|
||||||
|
import org.apache.hadoop.hbase.mob.RSMobFileCleanerChore;
|
||||||
import org.apache.hadoop.hbase.monitoring.TaskMonitor;
|
import org.apache.hadoop.hbase.monitoring.TaskMonitor;
|
||||||
import org.apache.hadoop.hbase.namequeues.NamedQueueRecorder;
|
import org.apache.hadoop.hbase.namequeues.NamedQueueRecorder;
|
||||||
import org.apache.hadoop.hbase.namequeues.SlowLogTableOpsChore;
|
import org.apache.hadoop.hbase.namequeues.SlowLogTableOpsChore;
|
||||||
|
@ -553,6 +554,8 @@ public class HRegionServer extends Thread
|
||||||
|
|
||||||
private BrokenStoreFileCleaner brokenStoreFileCleaner;
|
private BrokenStoreFileCleaner brokenStoreFileCleaner;
|
||||||
|
|
||||||
|
private RSMobFileCleanerChore rsMobFileCleanerChore;
|
||||||
|
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
CompactedHFilesDischarger compactedFileDischarger;
|
CompactedHFilesDischarger compactedFileDischarger;
|
||||||
|
|
||||||
|
@ -2203,6 +2206,10 @@ public class HRegionServer extends Thread
|
||||||
choreService.scheduleChore(brokenStoreFileCleaner);
|
choreService.scheduleChore(brokenStoreFileCleaner);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (this.rsMobFileCleanerChore != null) {
|
||||||
|
choreService.scheduleChore(rsMobFileCleanerChore);
|
||||||
|
}
|
||||||
|
|
||||||
// Leases is not a Thread. Internally it runs a daemon thread. If it gets
|
// Leases is not a Thread. Internally it runs a daemon thread. If it gets
|
||||||
// an unhandled exception, it will just exit.
|
// an unhandled exception, it will just exit.
|
||||||
Threads.setDaemonThreadRunning(this.leaseManager, getName() + ".leaseChecker",
|
Threads.setDaemonThreadRunning(this.leaseManager, getName() + ".leaseChecker",
|
||||||
|
@ -2299,6 +2306,8 @@ public class HRegionServer extends Thread
|
||||||
new BrokenStoreFileCleaner((int) (brokenStoreFileCleanerDelay + jitterValue),
|
new BrokenStoreFileCleaner((int) (brokenStoreFileCleanerDelay + jitterValue),
|
||||||
brokenStoreFileCleanerPeriod, this, conf, this);
|
brokenStoreFileCleanerPeriod, this, conf, this);
|
||||||
|
|
||||||
|
this.rsMobFileCleanerChore = new RSMobFileCleanerChore(this);
|
||||||
|
|
||||||
registerConfigurationObservers();
|
registerConfigurationObservers();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2781,6 +2790,7 @@ public class HRegionServer extends Thread
|
||||||
shutdownChore(storefileRefresher);
|
shutdownChore(storefileRefresher);
|
||||||
shutdownChore(fsUtilizationChore);
|
shutdownChore(fsUtilizationChore);
|
||||||
shutdownChore(slowLogTableOpsChore);
|
shutdownChore(slowLogTableOpsChore);
|
||||||
|
shutdownChore(rsMobFileCleanerChore);
|
||||||
// cancel the remaining scheduled chores (in case we missed out any)
|
// cancel the remaining scheduled chores (in case we missed out any)
|
||||||
// TODO: cancel will not cleanup the chores, so we need make sure we do not miss any
|
// TODO: cancel will not cleanup the chores, so we need make sure we do not miss any
|
||||||
choreService.shutdown();
|
choreService.shutdown();
|
||||||
|
@ -4056,6 +4066,11 @@ public class HRegionServer extends Thread
|
||||||
return brokenStoreFileCleaner;
|
return brokenStoreFileCleaner;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
public RSMobFileCleanerChore getRSMobFileCleanerChore() {
|
||||||
|
return rsMobFileCleanerChore;
|
||||||
|
}
|
||||||
|
|
||||||
RSSnapshotVerifier getRsSnapshotVerifier() {
|
RSSnapshotVerifier getRsSnapshotVerifier() {
|
||||||
return rsSnapshotVerifier;
|
return rsSnapshotVerifier;
|
||||||
}
|
}
|
||||||
|
|
|
@ -2421,7 +2421,7 @@ public class HStore
|
||||||
* {@link BrokenStoreFileCleaner} to prevent deleting the these files as they are not present in
|
* {@link BrokenStoreFileCleaner} to prevent deleting the these files as they are not present in
|
||||||
* SFT yet.
|
* SFT yet.
|
||||||
*/
|
*/
|
||||||
Set<Path> getStoreFilesBeingWritten() {
|
public Set<Path> getStoreFilesBeingWritten() {
|
||||||
return storeFileWriterCreationTrackers.stream().flatMap(t -> t.get().stream())
|
return storeFileWriterCreationTrackers.stream().flatMap(t -> t.get().stream())
|
||||||
.collect(Collectors.toSet());
|
.collect(Collectors.toSet());
|
||||||
}
|
}
|
||||||
|
|
|
@ -361,7 +361,7 @@ public abstract class Compactor<T extends CellSink> {
|
||||||
writer = sinkFactory.createWriter(scanner, fd, dropCache, request.isMajor(),
|
writer = sinkFactory.createWriter(scanner, fd, dropCache, request.isMajor(),
|
||||||
request.getWriterCreationTracker());
|
request.getWriterCreationTracker());
|
||||||
finished = performCompaction(fd, scanner, writer, smallestReadPoint, cleanSeqId,
|
finished = performCompaction(fd, scanner, writer, smallestReadPoint, cleanSeqId,
|
||||||
throughputController, request.isAllFiles(), request.getFiles().size(), progress);
|
throughputController, request, progress);
|
||||||
if (!finished) {
|
if (!finished) {
|
||||||
throw new InterruptedIOException("Aborting compaction of store " + store + " in region "
|
throw new InterruptedIOException("Aborting compaction of store " + store + " in region "
|
||||||
+ store.getRegionInfo().getRegionNameAsString() + " because it was interrupted.");
|
+ store.getRegionInfo().getRegionNameAsString() + " because it was interrupted.");
|
||||||
|
@ -400,20 +400,19 @@ public abstract class Compactor<T extends CellSink> {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Performs the compaction.
|
* Performs the compaction.
|
||||||
* @param fd FileDetails of cell sink writer
|
* @param fd FileDetails of cell sink writer
|
||||||
* @param scanner Where to read from.
|
* @param scanner Where to read from.
|
||||||
* @param writer Where to write to.
|
* @param writer Where to write to.
|
||||||
* @param smallestReadPoint Smallest read point.
|
* @param smallestReadPoint Smallest read point.
|
||||||
* @param cleanSeqId When true, remove seqId(used to be mvcc) value which is <=
|
* @param cleanSeqId When true, remove seqId(used to be mvcc) value which is <=
|
||||||
* smallestReadPoint
|
* smallestReadPoint
|
||||||
* @param major Is a major compaction.
|
* @param request compaction request.
|
||||||
* @param numofFilesToCompact the number of files to compact
|
* @param progress Progress reporter.
|
||||||
* @param progress Progress reporter.
|
|
||||||
* @return Whether compaction ended; false if it was interrupted for some reason.
|
* @return Whether compaction ended; false if it was interrupted for some reason.
|
||||||
*/
|
*/
|
||||||
protected boolean performCompaction(FileDetails fd, InternalScanner scanner, CellSink writer,
|
protected boolean performCompaction(FileDetails fd, InternalScanner scanner, CellSink writer,
|
||||||
long smallestReadPoint, boolean cleanSeqId, ThroughputController throughputController,
|
long smallestReadPoint, boolean cleanSeqId, ThroughputController throughputController,
|
||||||
boolean major, int numofFilesToCompact, CompactionProgress progress) throws IOException {
|
CompactionRequestImpl request, CompactionProgress progress) throws IOException {
|
||||||
assert writer instanceof ShipperListener;
|
assert writer instanceof ShipperListener;
|
||||||
long bytesWrittenProgressForLog = 0;
|
long bytesWrittenProgressForLog = 0;
|
||||||
long bytesWrittenProgressForShippedCall = 0;
|
long bytesWrittenProgressForShippedCall = 0;
|
||||||
|
@ -435,7 +434,7 @@ public abstract class Compactor<T extends CellSink> {
|
||||||
throughputController.start(compactionName);
|
throughputController.start(compactionName);
|
||||||
KeyValueScanner kvs = (scanner instanceof KeyValueScanner) ? (KeyValueScanner) scanner : null;
|
KeyValueScanner kvs = (scanner instanceof KeyValueScanner) ? (KeyValueScanner) scanner : null;
|
||||||
long shippedCallSizeLimit =
|
long shippedCallSizeLimit =
|
||||||
(long) numofFilesToCompact * this.store.getColumnFamilyDescriptor().getBlocksize();
|
(long) request.getFiles().size() * this.store.getColumnFamilyDescriptor().getBlocksize();
|
||||||
try {
|
try {
|
||||||
do {
|
do {
|
||||||
hasMore = scanner.next(cells, scannerContext);
|
hasMore = scanner.next(cells, scannerContext);
|
||||||
|
|
|
@ -42,6 +42,7 @@ import org.apache.hadoop.hbase.regionserver.ShipperListener;
|
||||||
import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
|
import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
|
||||||
import org.apache.hadoop.hbase.regionserver.compactions.CloseChecker;
|
import org.apache.hadoop.hbase.regionserver.compactions.CloseChecker;
|
||||||
import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
|
import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequestImpl;
|
||||||
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputControlUtil;
|
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputControlUtil;
|
||||||
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
|
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
@ -91,8 +92,9 @@ public class FaultyMobStoreCompactor extends DefaultMobStoreCompactor {
|
||||||
@Override
|
@Override
|
||||||
protected boolean performCompaction(FileDetails fd, InternalScanner scanner, CellSink writer,
|
protected boolean performCompaction(FileDetails fd, InternalScanner scanner, CellSink writer,
|
||||||
long smallestReadPoint, boolean cleanSeqId, ThroughputController throughputController,
|
long smallestReadPoint, boolean cleanSeqId, ThroughputController throughputController,
|
||||||
boolean major, int numofFilesToCompact, CompactionProgress progress) throws IOException {
|
CompactionRequestImpl request, CompactionProgress progress) throws IOException {
|
||||||
|
|
||||||
|
boolean major = request.isAllFiles();
|
||||||
totalCompactions.incrementAndGet();
|
totalCompactions.incrementAndGet();
|
||||||
if (major) {
|
if (major) {
|
||||||
totalMajorCompactions.incrementAndGet();
|
totalMajorCompactions.incrementAndGet();
|
||||||
|
@ -143,7 +145,7 @@ public class FaultyMobStoreCompactor extends DefaultMobStoreCompactor {
|
||||||
throughputController.start(compactionName);
|
throughputController.start(compactionName);
|
||||||
KeyValueScanner kvs = (scanner instanceof KeyValueScanner) ? (KeyValueScanner) scanner : null;
|
KeyValueScanner kvs = (scanner instanceof KeyValueScanner) ? (KeyValueScanner) scanner : null;
|
||||||
long shippedCallSizeLimit =
|
long shippedCallSizeLimit =
|
||||||
(long) numofFilesToCompact * this.store.getColumnFamilyDescriptor().getBlocksize();
|
(long) request.getFiles().size() * this.store.getColumnFamilyDescriptor().getBlocksize();
|
||||||
|
|
||||||
Cell mobCell = null;
|
Cell mobCell = null;
|
||||||
|
|
||||||
|
|
|
@ -18,17 +18,26 @@
|
||||||
package org.apache.hadoop.hbase.mob;
|
package org.apache.hadoop.hbase.mob;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.util.Date;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Random;
|
import java.util.Random;
|
||||||
import java.util.concurrent.ThreadLocalRandom;
|
import java.util.concurrent.ThreadLocalRandom;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.hbase.Cell;
|
import org.apache.hadoop.hbase.Cell;
|
||||||
import org.apache.hadoop.hbase.CellUtil;
|
import org.apache.hadoop.hbase.CellUtil;
|
||||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||||
import org.apache.hadoop.hbase.KeyValue;
|
import org.apache.hadoop.hbase.KeyValue;
|
||||||
|
import org.apache.hadoop.hbase.TableName;
|
||||||
|
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
|
||||||
import org.apache.hadoop.hbase.client.Result;
|
import org.apache.hadoop.hbase.client.Result;
|
||||||
import org.apache.hadoop.hbase.client.ResultScanner;
|
import org.apache.hadoop.hbase.client.ResultScanner;
|
||||||
import org.apache.hadoop.hbase.client.Scan;
|
import org.apache.hadoop.hbase.client.Scan;
|
||||||
import org.apache.hadoop.hbase.client.Table;
|
import org.apache.hadoop.hbase.client.Table;
|
||||||
|
import org.apache.hadoop.hbase.io.compress.Compression;
|
||||||
|
import org.apache.hadoop.hbase.io.crypto.Encryption;
|
||||||
|
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
|
||||||
import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
|
import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||||
|
@ -112,4 +121,17 @@ public class MobTestUtil {
|
||||||
scan.setAttribute(MobConstants.MOB_SCAN_RAW, Bytes.toBytes(Boolean.TRUE));
|
scan.setAttribute(MobConstants.MOB_SCAN_RAW, Bytes.toBytes(Boolean.TRUE));
|
||||||
return util.countRows(table, scan);
|
return util.countRows(table, scan);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static Path generateMOBFileForRegion(Configuration conf, TableName tableName,
|
||||||
|
ColumnFamilyDescriptor familyDescriptor, String regionName) throws IOException {
|
||||||
|
Date date = new Date();
|
||||||
|
String dateStr = MobUtils.formatDate(date);
|
||||||
|
FileSystem fs = FileSystem.get(conf);
|
||||||
|
Path cfMOBDir = MobUtils.getMobFamilyPath(conf, tableName, familyDescriptor.getNameAsString());
|
||||||
|
StoreFileWriter writer = MobUtils.createWriter(conf, fs, familyDescriptor, dateStr, cfMOBDir,
|
||||||
|
1000L, Compression.Algorithm.NONE, "startKey", CacheConfig.DISABLED, Encryption.Context.NONE,
|
||||||
|
false, "");
|
||||||
|
writer.close();
|
||||||
|
return writer.getPath();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -17,30 +17,37 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hbase.mob;
|
package org.apache.hadoop.hbase.mob;
|
||||||
|
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.Collection;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import org.apache.hadoop.hbase.Cell;
|
import org.apache.hadoop.hbase.Cell;
|
||||||
import org.apache.hadoop.hbase.CellUtil;
|
import org.apache.hadoop.hbase.CellUtil;
|
||||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||||
import org.apache.hadoop.hbase.HColumnDescriptor;
|
|
||||||
import org.apache.hadoop.hbase.HTableDescriptor;
|
|
||||||
import org.apache.hadoop.hbase.TableName;
|
import org.apache.hadoop.hbase.TableName;
|
||||||
|
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
|
||||||
import org.apache.hadoop.hbase.client.Put;
|
import org.apache.hadoop.hbase.client.Put;
|
||||||
import org.apache.hadoop.hbase.client.Result;
|
import org.apache.hadoop.hbase.client.Result;
|
||||||
import org.apache.hadoop.hbase.client.ResultScanner;
|
import org.apache.hadoop.hbase.client.ResultScanner;
|
||||||
import org.apache.hadoop.hbase.client.Scan;
|
import org.apache.hadoop.hbase.client.Scan;
|
||||||
import org.apache.hadoop.hbase.client.Table;
|
import org.apache.hadoop.hbase.client.Table;
|
||||||
|
import org.apache.hadoop.hbase.client.TableDescriptor;
|
||||||
|
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory;
|
||||||
import org.apache.hadoop.hbase.testclassification.LargeTests;
|
import org.apache.hadoop.hbase.testclassification.LargeTests;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
import org.junit.AfterClass;
|
import org.junit.After;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.BeforeClass;
|
import org.junit.Before;
|
||||||
import org.junit.ClassRule;
|
import org.junit.ClassRule;
|
||||||
import org.junit.Rule;
|
import org.junit.Rule;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.junit.experimental.categories.Category;
|
import org.junit.experimental.categories.Category;
|
||||||
import org.junit.rules.TestName;
|
import org.junit.rules.TestName;
|
||||||
|
import org.junit.runner.RunWith;
|
||||||
|
import org.junit.runners.Parameterized;
|
||||||
|
|
||||||
|
@RunWith(Parameterized.class)
|
||||||
@Category(LargeTests.class)
|
@Category(LargeTests.class)
|
||||||
public class TestDefaultMobStoreFlusher {
|
public class TestDefaultMobStoreFlusher {
|
||||||
|
|
||||||
|
@ -60,41 +67,52 @@ public class TestDefaultMobStoreFlusher {
|
||||||
@Rule
|
@Rule
|
||||||
public TestName name = new TestName();
|
public TestName name = new TestName();
|
||||||
|
|
||||||
@BeforeClass
|
protected Boolean useFileBasedSFT;
|
||||||
public static void setUpBeforeClass() throws Exception {
|
|
||||||
|
public TestDefaultMobStoreFlusher(Boolean useFileBasedSFT) {
|
||||||
|
this.useFileBasedSFT = useFileBasedSFT;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Parameterized.Parameters
|
||||||
|
public static Collection<Boolean> data() {
|
||||||
|
Boolean[] data = { false, true };
|
||||||
|
return Arrays.asList(data);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void setUpBefore() throws Exception {
|
||||||
|
if (useFileBasedSFT) {
|
||||||
|
TEST_UTIL.getConfiguration().set(StoreFileTrackerFactory.TRACKER_IMPL,
|
||||||
|
"org.apache.hadoop.hbase.regionserver.storefiletracker.FileBasedStoreFileTracker");
|
||||||
|
}
|
||||||
TEST_UTIL.startMiniCluster(1);
|
TEST_UTIL.startMiniCluster(1);
|
||||||
}
|
}
|
||||||
|
|
||||||
@AfterClass
|
@After
|
||||||
public static void tearDownAfterClass() throws Exception {
|
public void tearDownAfter() throws Exception {
|
||||||
TEST_UTIL.shutdownMiniCluster();
|
TEST_UTIL.shutdownMiniCluster();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testFlushNonMobFile() throws Exception {
|
public void testFlushNonMobFile() throws Exception {
|
||||||
final TableName tableName = TableName.valueOf(name.getMethodName());
|
final TableName tableName = TableName.valueOf(TestMobUtils.getTableName(name));
|
||||||
HTableDescriptor desc = new HTableDescriptor(tableName);
|
TableDescriptor tableDescriptor = TableDescriptorBuilder.newBuilder(tableName)
|
||||||
HColumnDescriptor hcd = new HColumnDescriptor(family);
|
.setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(4).build())
|
||||||
hcd.setMaxVersions(4);
|
.build();
|
||||||
desc.addFamily(hcd);
|
testFlushFile(tableDescriptor);
|
||||||
|
|
||||||
testFlushFile(desc);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testFlushMobFile() throws Exception {
|
public void testFlushMobFile() throws Exception {
|
||||||
final TableName tableName = TableName.valueOf(name.getMethodName());
|
final TableName tableName = TableName.valueOf(TestMobUtils.getTableName(name));
|
||||||
HTableDescriptor desc = new HTableDescriptor(tableName);
|
TableDescriptor tableDescriptor = TableDescriptorBuilder.newBuilder(tableName)
|
||||||
HColumnDescriptor hcd = new HColumnDescriptor(family);
|
.setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(family).setMobEnabled(true)
|
||||||
hcd.setMobEnabled(true);
|
.setMobThreshold(3L).setMaxVersions(4).build())
|
||||||
hcd.setMobThreshold(3L);
|
.build();
|
||||||
hcd.setMaxVersions(4);
|
testFlushFile(tableDescriptor);
|
||||||
desc.addFamily(hcd);
|
|
||||||
|
|
||||||
testFlushFile(desc);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private void testFlushFile(HTableDescriptor htd) throws Exception {
|
private void testFlushFile(TableDescriptor htd) throws Exception {
|
||||||
Table table = null;
|
Table table = null;
|
||||||
try {
|
try {
|
||||||
table = TEST_UTIL.createTable(htd, null);
|
table = TEST_UTIL.createTable(htd, null);
|
||||||
|
|
|
@ -1,218 +0,0 @@
|
||||||
/*
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
|
||||||
* or more contributor license agreements. See the NOTICE file
|
|
||||||
* distributed with this work for additional information
|
|
||||||
* regarding copyright ownership. The ASF licenses this file
|
|
||||||
* to you under the Apache License, Version 2.0 (the
|
|
||||||
* "License"); you may not use this file except in compliance
|
|
||||||
* with the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
package org.apache.hadoop.hbase.mob;
|
|
||||||
|
|
||||||
import static org.junit.Assert.assertEquals;
|
|
||||||
import static org.junit.Assert.assertTrue;
|
|
||||||
import static org.junit.Assert.fail;
|
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.util.Arrays;
|
|
||||||
import java.util.Random;
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
|
||||||
import org.apache.hadoop.fs.FileStatus;
|
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
|
||||||
import org.apache.hadoop.fs.Path;
|
|
||||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
|
||||||
import org.apache.hadoop.hbase.HColumnDescriptor;
|
|
||||||
import org.apache.hadoop.hbase.HTableDescriptor;
|
|
||||||
import org.apache.hadoop.hbase.TableName;
|
|
||||||
import org.apache.hadoop.hbase.client.Admin;
|
|
||||||
import org.apache.hadoop.hbase.client.CompactionState;
|
|
||||||
import org.apache.hadoop.hbase.client.Put;
|
|
||||||
import org.apache.hadoop.hbase.client.Result;
|
|
||||||
import org.apache.hadoop.hbase.client.ResultScanner;
|
|
||||||
import org.apache.hadoop.hbase.client.Table;
|
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
|
||||||
import org.apache.hadoop.hbase.util.RegionSplitter;
|
|
||||||
import org.junit.After;
|
|
||||||
import org.junit.Before;
|
|
||||||
import org.slf4j.Logger;
|
|
||||||
import org.slf4j.LoggerFactory;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Mob file compaction base test. 1. Enables batch mode for regular MOB compaction, Sets batch size
|
|
||||||
* to 7 regions. (Optional) 2. Disables periodic MOB compactions, sets minimum age to archive to 10
|
|
||||||
* sec 3. Creates MOB table with 20 regions 4. Loads MOB data (randomized keys, 1000 rows), flushes
|
|
||||||
* data. 5. Repeats 4. two more times 6. Verifies that we have 20 *3 = 60 mob files (equals to
|
|
||||||
* number of regions x 3) 7. Runs major MOB compaction. 8. Verifies that number of MOB files in a
|
|
||||||
* mob directory is 20 x4 = 80 9. Waits for a period of time larger than minimum age to archive 10.
|
|
||||||
* Runs Mob cleaner chore 11 Verifies that number of MOB files in a mob directory is 20. 12 Runs
|
|
||||||
* scanner and checks all 3 * 1000 rows.
|
|
||||||
*/
|
|
||||||
@SuppressWarnings("deprecation")
|
|
||||||
public abstract class TestMobCompactionBase {
|
|
||||||
private static final Logger LOG = LoggerFactory.getLogger(TestMobCompactionBase.class);
|
|
||||||
|
|
||||||
protected HBaseTestingUtility HTU;
|
|
||||||
|
|
||||||
protected final static String famStr = "f1";
|
|
||||||
protected final static byte[] fam = Bytes.toBytes(famStr);
|
|
||||||
protected final static byte[] qualifier = Bytes.toBytes("q1");
|
|
||||||
protected final static long mobLen = 10;
|
|
||||||
protected final static byte[] mobVal = Bytes
|
|
||||||
.toBytes("01234567890123456789012345678901234567890123456789012345678901234567890123456789");
|
|
||||||
|
|
||||||
protected Configuration conf;
|
|
||||||
protected HTableDescriptor hdt;
|
|
||||||
private HColumnDescriptor hcd;
|
|
||||||
protected Admin admin;
|
|
||||||
protected Table table = null;
|
|
||||||
protected long minAgeToArchive = 10000;
|
|
||||||
protected int numRegions = 20;
|
|
||||||
protected int rows = 1000;
|
|
||||||
|
|
||||||
protected MobFileCleanerChore cleanerChore;
|
|
||||||
|
|
||||||
public TestMobCompactionBase() {
|
|
||||||
}
|
|
||||||
|
|
||||||
@Before
|
|
||||||
public void setUp() throws Exception {
|
|
||||||
HTU = new HBaseTestingUtility();
|
|
||||||
hdt = HTU.createTableDescriptor(getClass().getName());
|
|
||||||
conf = HTU.getConfiguration();
|
|
||||||
|
|
||||||
initConf();
|
|
||||||
|
|
||||||
HTU.startMiniCluster();
|
|
||||||
admin = HTU.getAdmin();
|
|
||||||
cleanerChore = new MobFileCleanerChore();
|
|
||||||
hcd = new HColumnDescriptor(fam);
|
|
||||||
hcd.setMobEnabled(true);
|
|
||||||
hcd.setMobThreshold(mobLen);
|
|
||||||
hcd.setMaxVersions(1);
|
|
||||||
hdt.addFamily(hcd);
|
|
||||||
RegionSplitter.UniformSplit splitAlgo = new RegionSplitter.UniformSplit();
|
|
||||||
byte[][] splitKeys = splitAlgo.split(numRegions);
|
|
||||||
table = HTU.createTable(hdt, splitKeys);
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
protected void initConf() {
|
|
||||||
|
|
||||||
conf.setInt("hfile.format.version", 3);
|
|
||||||
// Disable automatic MOB compaction
|
|
||||||
conf.setLong(MobConstants.MOB_COMPACTION_CHORE_PERIOD, 0);
|
|
||||||
// Disable automatic MOB file cleaner chore
|
|
||||||
conf.setLong(MobConstants.MOB_CLEANER_PERIOD, 0);
|
|
||||||
// Set minimum age to archive to 10 sec
|
|
||||||
conf.setLong(MobConstants.MIN_AGE_TO_ARCHIVE_KEY, minAgeToArchive);
|
|
||||||
// Set compacted file discharger interval to a half minAgeToArchive
|
|
||||||
conf.setLong("hbase.hfile.compaction.discharger.interval", minAgeToArchive / 2);
|
|
||||||
}
|
|
||||||
|
|
||||||
private void loadData(int num) {
|
|
||||||
|
|
||||||
Random r = new Random();
|
|
||||||
try {
|
|
||||||
LOG.info("Started loading {} rows", num);
|
|
||||||
for (int i = 0; i < num; i++) {
|
|
||||||
byte[] key = new byte[32];
|
|
||||||
r.nextBytes(key);
|
|
||||||
Put p = new Put(key);
|
|
||||||
p.addColumn(fam, qualifier, mobVal);
|
|
||||||
table.put(p);
|
|
||||||
}
|
|
||||||
admin.flush(table.getName());
|
|
||||||
LOG.info("Finished loading {} rows", num);
|
|
||||||
} catch (Exception e) {
|
|
||||||
LOG.error("MOB file compaction chore test FAILED", e);
|
|
||||||
fail("MOB file compaction chore test FAILED");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@After
|
|
||||||
public void tearDown() throws Exception {
|
|
||||||
admin.disableTable(hdt.getTableName());
|
|
||||||
admin.deleteTable(hdt.getTableName());
|
|
||||||
HTU.shutdownMiniCluster();
|
|
||||||
}
|
|
||||||
|
|
||||||
public void baseTestMobFileCompaction() throws InterruptedException, IOException {
|
|
||||||
|
|
||||||
// Load and flush data 3 times
|
|
||||||
loadData(rows);
|
|
||||||
loadData(rows);
|
|
||||||
loadData(rows);
|
|
||||||
long num = getNumberOfMobFiles(conf, table.getName(), new String(fam));
|
|
||||||
assertEquals(numRegions * 3, num);
|
|
||||||
// Major MOB compact
|
|
||||||
mobCompact(admin, hdt, hcd);
|
|
||||||
// wait until compaction is complete
|
|
||||||
while (admin.getCompactionState(hdt.getTableName()) != CompactionState.NONE) {
|
|
||||||
Thread.sleep(100);
|
|
||||||
}
|
|
||||||
|
|
||||||
num = getNumberOfMobFiles(conf, table.getName(), new String(fam));
|
|
||||||
assertEquals(numRegions * 4, num);
|
|
||||||
// We have guarantee, that compacted file discharger will run during this pause
|
|
||||||
// because it has interval less than this wait time
|
|
||||||
LOG.info("Waiting for {}ms", minAgeToArchive + 1000);
|
|
||||||
|
|
||||||
Thread.sleep(minAgeToArchive + 1000);
|
|
||||||
LOG.info("Cleaning up MOB files");
|
|
||||||
// Cleanup again
|
|
||||||
cleanerChore.cleanupObsoleteMobFiles(conf, table.getName());
|
|
||||||
|
|
||||||
num = getNumberOfMobFiles(conf, table.getName(), new String(fam));
|
|
||||||
assertEquals(numRegions, num);
|
|
||||||
|
|
||||||
long scanned = scanTable();
|
|
||||||
assertEquals(3 * rows, scanned);
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
protected abstract void mobCompact(Admin admin2, HTableDescriptor hdt2, HColumnDescriptor hcd2)
|
|
||||||
throws IOException, InterruptedException;
|
|
||||||
|
|
||||||
protected long getNumberOfMobFiles(Configuration conf, TableName tableName, String family)
|
|
||||||
throws IOException {
|
|
||||||
FileSystem fs = FileSystem.get(conf);
|
|
||||||
Path dir = MobUtils.getMobFamilyPath(conf, tableName, family);
|
|
||||||
FileStatus[] stat = fs.listStatus(dir);
|
|
||||||
for (FileStatus st : stat) {
|
|
||||||
LOG.debug("MOB Directory content: {}", st.getPath());
|
|
||||||
}
|
|
||||||
LOG.debug("MOB Directory content total files: {}", stat.length);
|
|
||||||
|
|
||||||
return stat.length;
|
|
||||||
}
|
|
||||||
|
|
||||||
protected long scanTable() {
|
|
||||||
try {
|
|
||||||
|
|
||||||
Result result;
|
|
||||||
ResultScanner scanner = table.getScanner(fam);
|
|
||||||
long counter = 0;
|
|
||||||
while ((result = scanner.next()) != null) {
|
|
||||||
assertTrue(Arrays.equals(result.getValue(fam, qualifier), mobVal));
|
|
||||||
counter++;
|
|
||||||
}
|
|
||||||
return counter;
|
|
||||||
} catch (Exception e) {
|
|
||||||
LOG.error("MOB file compaction test FAILED", e);
|
|
||||||
if (HTU != null) {
|
|
||||||
fail(e.getMessage());
|
|
||||||
} else {
|
|
||||||
System.exit(-1);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -17,10 +17,8 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hbase.mob;
|
package org.apache.hadoop.hbase.mob;
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||||
import org.apache.hadoop.hbase.testclassification.LargeTests;
|
import org.apache.hadoop.hbase.testclassification.LargeTests;
|
||||||
import org.junit.BeforeClass;
|
|
||||||
import org.junit.ClassRule;
|
import org.junit.ClassRule;
|
||||||
import org.junit.experimental.categories.Category;
|
import org.junit.experimental.categories.Category;
|
||||||
|
|
||||||
|
@ -40,12 +38,13 @@ public class TestMobCompactionOptMode extends TestMobCompactionWithDefaults {
|
||||||
public static final HBaseClassTestRule CLASS_RULE =
|
public static final HBaseClassTestRule CLASS_RULE =
|
||||||
HBaseClassTestRule.forClass(TestMobCompactionOptMode.class);
|
HBaseClassTestRule.forClass(TestMobCompactionOptMode.class);
|
||||||
|
|
||||||
@BeforeClass
|
public TestMobCompactionOptMode(Boolean useFileBasedSFT) {
|
||||||
public static void configureOptimizedCompaction() throws InterruptedException, IOException {
|
super(useFileBasedSFT);
|
||||||
HTU.shutdownMiniHBaseCluster();
|
}
|
||||||
|
|
||||||
|
protected void additonalConfigSetup() {
|
||||||
conf.set(MobConstants.MOB_COMPACTION_TYPE_KEY, MobConstants.OPTIMIZED_MOB_COMPACTION_TYPE);
|
conf.set(MobConstants.MOB_COMPACTION_TYPE_KEY, MobConstants.OPTIMIZED_MOB_COMPACTION_TYPE);
|
||||||
conf.setLong(MobConstants.MOB_COMPACTION_MAX_FILE_SIZE_KEY, 1000000);
|
conf.setLong(MobConstants.MOB_COMPACTION_MAX_FILE_SIZE_KEY, 1000000);
|
||||||
HTU.startMiniHBaseCluster();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -23,9 +23,10 @@ import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
|
||||||
import org.apache.hadoop.hbase.client.TableDescriptor;
|
import org.apache.hadoop.hbase.client.TableDescriptor;
|
||||||
import org.apache.hadoop.hbase.testclassification.LargeTests;
|
import org.apache.hadoop.hbase.testclassification.LargeTests;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.BeforeClass;
|
|
||||||
import org.junit.ClassRule;
|
import org.junit.ClassRule;
|
||||||
import org.junit.experimental.categories.Category;
|
import org.junit.experimental.categories.Category;
|
||||||
|
import org.junit.runner.RunWith;
|
||||||
|
import org.junit.runners.Parameterized;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
@ -39,6 +40,7 @@ import org.slf4j.LoggerFactory;
|
||||||
* time larger than minimum age to archive 10. Runs Mob cleaner chore 11 Verifies that number of MOB
|
* time larger than minimum age to archive 10. Runs Mob cleaner chore 11 Verifies that number of MOB
|
||||||
* files in a mob directory is 20. 12 Runs scanner and checks all 3 * 1000 rows.
|
* files in a mob directory is 20. 12 Runs scanner and checks all 3 * 1000 rows.
|
||||||
*/
|
*/
|
||||||
|
@RunWith(Parameterized.class)
|
||||||
@Category(LargeTests.class)
|
@Category(LargeTests.class)
|
||||||
public class TestMobCompactionOptRegionBatchMode extends TestMobCompactionWithDefaults {
|
public class TestMobCompactionOptRegionBatchMode extends TestMobCompactionWithDefaults {
|
||||||
private static final Logger LOG =
|
private static final Logger LOG =
|
||||||
|
@ -50,20 +52,20 @@ public class TestMobCompactionOptRegionBatchMode extends TestMobCompactionWithDe
|
||||||
private static final int batchSize = 7;
|
private static final int batchSize = 7;
|
||||||
private MobFileCompactionChore compactionChore;
|
private MobFileCompactionChore compactionChore;
|
||||||
|
|
||||||
|
public TestMobCompactionOptRegionBatchMode(Boolean useFileBasedSFT) {
|
||||||
|
super(useFileBasedSFT);
|
||||||
|
}
|
||||||
|
|
||||||
@Before
|
@Before
|
||||||
public void setUp() throws Exception {
|
public void setUp() throws Exception {
|
||||||
super.setUp();
|
super.setUp();
|
||||||
compactionChore = new MobFileCompactionChore(conf, batchSize);
|
compactionChore = new MobFileCompactionChore(conf, batchSize);
|
||||||
}
|
}
|
||||||
|
|
||||||
@BeforeClass
|
protected void additonalConfigSetup() {
|
||||||
public static void configureOptimizedCompactionAndBatches()
|
|
||||||
throws InterruptedException, IOException {
|
|
||||||
HTU.shutdownMiniHBaseCluster();
|
|
||||||
conf.setInt(MobConstants.MOB_MAJOR_COMPACTION_REGION_BATCH_SIZE, batchSize);
|
conf.setInt(MobConstants.MOB_MAJOR_COMPACTION_REGION_BATCH_SIZE, batchSize);
|
||||||
conf.set(MobConstants.MOB_COMPACTION_TYPE_KEY, MobConstants.OPTIMIZED_MOB_COMPACTION_TYPE);
|
conf.set(MobConstants.MOB_COMPACTION_TYPE_KEY, MobConstants.OPTIMIZED_MOB_COMPACTION_TYPE);
|
||||||
conf.setLong(MobConstants.MOB_COMPACTION_MAX_FILE_SIZE_KEY, 1000000);
|
conf.setLong(MobConstants.MOB_COMPACTION_MAX_FILE_SIZE_KEY, 1000000);
|
||||||
HTU.startMiniHBaseCluster();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -1,68 +0,0 @@
|
||||||
/*
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
|
||||||
* or more contributor license agreements. See the NOTICE file
|
|
||||||
* distributed with this work for additional information
|
|
||||||
* regarding copyright ownership. The ASF licenses this file
|
|
||||||
* to you under the Apache License, Version 2.0 (the
|
|
||||||
* "License"); you may not use this file except in compliance
|
|
||||||
* with the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
package org.apache.hadoop.hbase.mob;
|
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
|
||||||
import org.apache.hadoop.hbase.HColumnDescriptor;
|
|
||||||
import org.apache.hadoop.hbase.HTableDescriptor;
|
|
||||||
import org.apache.hadoop.hbase.client.Admin;
|
|
||||||
import org.apache.hadoop.hbase.testclassification.LargeTests;
|
|
||||||
import org.junit.ClassRule;
|
|
||||||
import org.junit.Test;
|
|
||||||
import org.junit.experimental.categories.Category;
|
|
||||||
import org.slf4j.Logger;
|
|
||||||
import org.slf4j.LoggerFactory;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Mob file compaction chore in a regular non-batch mode test. 1. Uses default (non-batch) mode for
|
|
||||||
* regular MOB compaction, 2. Disables periodic MOB compactions, sets minimum age to archive to 10
|
|
||||||
* sec 3. Creates MOB table with 20 regions 4. Loads MOB data (randomized keys, 1000 rows), flushes
|
|
||||||
* data. 5. Repeats 4. two more times 6. Verifies that we have 20 *3 = 60 mob files (equals to
|
|
||||||
* number of regions x 3) 7. Runs major MOB compaction. 8. Verifies that number of MOB files in a
|
|
||||||
* mob directory is 20 x4 = 80 9. Waits for a period of time larger than minimum age to archive 10.
|
|
||||||
* Runs Mob cleaner chore 11 Verifies that number of MOB files in a mob directory is 20. 12 Runs
|
|
||||||
* scanner and checks all 3 * 1000 rows.
|
|
||||||
*/
|
|
||||||
@SuppressWarnings("deprecation")
|
|
||||||
@Category(LargeTests.class)
|
|
||||||
public class TestMobCompactionRegularMode extends TestMobCompactionBase {
|
|
||||||
private static final Logger LOG = LoggerFactory.getLogger(TestMobCompactionRegularMode.class);
|
|
||||||
@ClassRule
|
|
||||||
public static final HBaseClassTestRule CLASS_RULE =
|
|
||||||
HBaseClassTestRule.forClass(TestMobCompactionRegularMode.class);
|
|
||||||
|
|
||||||
public TestMobCompactionRegularMode() {
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testMobFileCompactionBatchMode() throws InterruptedException, IOException {
|
|
||||||
LOG.info("MOB compaction regular mode started");
|
|
||||||
baseTestMobFileCompaction();
|
|
||||||
LOG.info("MOB compaction regular mode finished OK");
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected void mobCompact(Admin admin, HTableDescriptor hdt, HColumnDescriptor hcd)
|
|
||||||
throws IOException, InterruptedException {
|
|
||||||
// Major compact MOB table
|
|
||||||
admin.majorCompact(hdt.getTableName(), hcd.getName());
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
|
@ -23,9 +23,10 @@ import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
|
||||||
import org.apache.hadoop.hbase.client.TableDescriptor;
|
import org.apache.hadoop.hbase.client.TableDescriptor;
|
||||||
import org.apache.hadoop.hbase.testclassification.LargeTests;
|
import org.apache.hadoop.hbase.testclassification.LargeTests;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.BeforeClass;
|
|
||||||
import org.junit.ClassRule;
|
import org.junit.ClassRule;
|
||||||
import org.junit.experimental.categories.Category;
|
import org.junit.experimental.categories.Category;
|
||||||
|
import org.junit.runner.RunWith;
|
||||||
|
import org.junit.runners.Parameterized;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
@ -39,6 +40,7 @@ import org.slf4j.LoggerFactory;
|
||||||
* to archive 10. Runs Mob cleaner chore 11 Verifies that number of MOB files in a mob directory is
|
* to archive 10. Runs Mob cleaner chore 11 Verifies that number of MOB files in a mob directory is
|
||||||
* 20. 12 Runs scanner and checks all 3 * 1000 rows.
|
* 20. 12 Runs scanner and checks all 3 * 1000 rows.
|
||||||
*/
|
*/
|
||||||
|
@RunWith(Parameterized.class)
|
||||||
@Category(LargeTests.class)
|
@Category(LargeTests.class)
|
||||||
public class TestMobCompactionRegularRegionBatchMode extends TestMobCompactionWithDefaults {
|
public class TestMobCompactionRegularRegionBatchMode extends TestMobCompactionWithDefaults {
|
||||||
private static final Logger LOG =
|
private static final Logger LOG =
|
||||||
|
@ -50,17 +52,18 @@ public class TestMobCompactionRegularRegionBatchMode extends TestMobCompactionWi
|
||||||
private static final int batchSize = 7;
|
private static final int batchSize = 7;
|
||||||
private MobFileCompactionChore compactionChore;
|
private MobFileCompactionChore compactionChore;
|
||||||
|
|
||||||
|
public TestMobCompactionRegularRegionBatchMode(Boolean useFileBasedSFT) {
|
||||||
|
super(useFileBasedSFT);
|
||||||
|
}
|
||||||
|
|
||||||
@Before
|
@Before
|
||||||
public void setUp() throws Exception {
|
public void setUp() throws Exception {
|
||||||
super.setUp();
|
super.setUp();
|
||||||
compactionChore = new MobFileCompactionChore(conf, batchSize);
|
compactionChore = new MobFileCompactionChore(conf, batchSize);
|
||||||
}
|
}
|
||||||
|
|
||||||
@BeforeClass
|
protected void additonalConfigSetup() {
|
||||||
public static void configureCompactionBatches() throws InterruptedException, IOException {
|
|
||||||
HTU.shutdownMiniHBaseCluster();
|
|
||||||
conf.setInt(MobConstants.MOB_MAJOR_COMPACTION_REGION_BATCH_SIZE, batchSize);
|
conf.setInt(MobConstants.MOB_MAJOR_COMPACTION_REGION_BATCH_SIZE, batchSize);
|
||||||
HTU.startMiniHBaseCluster();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -23,6 +23,7 @@ import static org.junit.Assert.fail;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
|
import java.util.Collection;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Random;
|
import java.util.Random;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
@ -32,6 +33,7 @@ import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||||
|
import org.apache.hadoop.hbase.ServerName;
|
||||||
import org.apache.hadoop.hbase.TableName;
|
import org.apache.hadoop.hbase.TableName;
|
||||||
import org.apache.hadoop.hbase.client.Admin;
|
import org.apache.hadoop.hbase.client.Admin;
|
||||||
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
|
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
|
||||||
|
@ -43,18 +45,19 @@ import org.apache.hadoop.hbase.client.ResultScanner;
|
||||||
import org.apache.hadoop.hbase.client.Table;
|
import org.apache.hadoop.hbase.client.Table;
|
||||||
import org.apache.hadoop.hbase.client.TableDescriptor;
|
import org.apache.hadoop.hbase.client.TableDescriptor;
|
||||||
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
|
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory;
|
||||||
import org.apache.hadoop.hbase.testclassification.LargeTests;
|
import org.apache.hadoop.hbase.testclassification.LargeTests;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
import org.apache.hadoop.hbase.util.RegionSplitter;
|
import org.apache.hadoop.hbase.util.RegionSplitter;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
import org.junit.AfterClass;
|
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.BeforeClass;
|
|
||||||
import org.junit.ClassRule;
|
import org.junit.ClassRule;
|
||||||
import org.junit.Rule;
|
import org.junit.Rule;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.junit.experimental.categories.Category;
|
import org.junit.experimental.categories.Category;
|
||||||
import org.junit.rules.TestName;
|
import org.junit.rules.TestName;
|
||||||
|
import org.junit.runner.RunWith;
|
||||||
|
import org.junit.runners.Parameterized;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
@ -68,6 +71,7 @@ import org.slf4j.LoggerFactory;
|
||||||
* Runs Mob cleaner chore 11 Verifies that number of MOB files in a mob directory is 20. 12 Runs
|
* Runs Mob cleaner chore 11 Verifies that number of MOB files in a mob directory is 20. 12 Runs
|
||||||
* scanner and checks all 3 * 1000 rows.
|
* scanner and checks all 3 * 1000 rows.
|
||||||
*/
|
*/
|
||||||
|
@RunWith(Parameterized.class)
|
||||||
@Category(LargeTests.class)
|
@Category(LargeTests.class)
|
||||||
public class TestMobCompactionWithDefaults {
|
public class TestMobCompactionWithDefaults {
|
||||||
private static final Logger LOG = LoggerFactory.getLogger(TestMobCompactionWithDefaults.class);
|
private static final Logger LOG = LoggerFactory.getLogger(TestMobCompactionWithDefaults.class);
|
||||||
|
@ -97,8 +101,19 @@ public class TestMobCompactionWithDefaults {
|
||||||
|
|
||||||
protected MobFileCleanerChore cleanerChore;
|
protected MobFileCleanerChore cleanerChore;
|
||||||
|
|
||||||
@BeforeClass
|
protected Boolean useFileBasedSFT;
|
||||||
public static void htuStart() throws Exception {
|
|
||||||
|
public TestMobCompactionWithDefaults(Boolean useFileBasedSFT) {
|
||||||
|
this.useFileBasedSFT = useFileBasedSFT;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Parameterized.Parameters
|
||||||
|
public static Collection<Boolean> data() {
|
||||||
|
Boolean[] data = { false, true };
|
||||||
|
return Arrays.asList(data);
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void htuStart() throws Exception {
|
||||||
HTU = new HBaseTestingUtility();
|
HTU = new HBaseTestingUtility();
|
||||||
conf = HTU.getConfiguration();
|
conf = HTU.getConfiguration();
|
||||||
conf.setInt("hfile.format.version", 3);
|
conf.setInt("hfile.format.version", 3);
|
||||||
|
@ -111,17 +126,21 @@ public class TestMobCompactionWithDefaults {
|
||||||
// Set compacted file discharger interval to a half minAgeToArchive
|
// Set compacted file discharger interval to a half minAgeToArchive
|
||||||
conf.setLong("hbase.hfile.compaction.discharger.interval", minAgeToArchive / 2);
|
conf.setLong("hbase.hfile.compaction.discharger.interval", minAgeToArchive / 2);
|
||||||
conf.setBoolean("hbase.regionserver.compaction.enabled", false);
|
conf.setBoolean("hbase.regionserver.compaction.enabled", false);
|
||||||
|
if (useFileBasedSFT) {
|
||||||
|
conf.set(StoreFileTrackerFactory.TRACKER_IMPL,
|
||||||
|
"org.apache.hadoop.hbase.regionserver.storefiletracker.FileBasedStoreFileTracker");
|
||||||
|
}
|
||||||
|
additonalConfigSetup();
|
||||||
HTU.startMiniCluster();
|
HTU.startMiniCluster();
|
||||||
}
|
}
|
||||||
|
|
||||||
@AfterClass
|
protected void additonalConfigSetup() {
|
||||||
public static void htuStop() throws Exception {
|
|
||||||
HTU.shutdownMiniCluster();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Before
|
@Before
|
||||||
public void setUp() throws Exception {
|
public void setUp() throws Exception {
|
||||||
tableDescriptor = HTU.createModifyableTableDescriptor(test.getMethodName());
|
htuStart();
|
||||||
|
tableDescriptor = HTU.createModifyableTableDescriptor(TestMobUtils.getTableName(test));
|
||||||
admin = HTU.getAdmin();
|
admin = HTU.getAdmin();
|
||||||
cleanerChore = new MobFileCleanerChore();
|
cleanerChore = new MobFileCleanerChore();
|
||||||
familyDescriptor = new ColumnFamilyDescriptorBuilder.ModifyableColumnFamilyDescriptor(fam);
|
familyDescriptor = new ColumnFamilyDescriptorBuilder.ModifyableColumnFamilyDescriptor(fam);
|
||||||
|
@ -158,6 +177,7 @@ public class TestMobCompactionWithDefaults {
|
||||||
public void tearDown() throws Exception {
|
public void tearDown() throws Exception {
|
||||||
admin.disableTable(tableDescriptor.getTableName());
|
admin.disableTable(tableDescriptor.getTableName());
|
||||||
admin.deleteTable(tableDescriptor.getTableName());
|
admin.deleteTable(tableDescriptor.getTableName());
|
||||||
|
HTU.shutdownMiniCluster();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -173,12 +193,12 @@ public class TestMobCompactionWithDefaults {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testMobFileCompactionAfterSnapshotClone() throws InterruptedException, IOException {
|
public void testMobFileCompactionAfterSnapshotClone() throws InterruptedException, IOException {
|
||||||
final TableName clone = TableName.valueOf(test.getMethodName() + "-clone");
|
final TableName clone = TableName.valueOf(TestMobUtils.getTableName(test) + "-clone");
|
||||||
LOG.info("MOB compaction of cloned snapshot, " + description() + " started");
|
LOG.info("MOB compaction of cloned snapshot, " + description() + " started");
|
||||||
loadAndFlushThreeTimes(rows, table, famStr);
|
loadAndFlushThreeTimes(rows, table, famStr);
|
||||||
LOG.debug("Taking snapshot and cloning table {}", table);
|
LOG.debug("Taking snapshot and cloning table {}", table);
|
||||||
admin.snapshot(test.getMethodName(), table);
|
admin.snapshot(TestMobUtils.getTableName(test), table);
|
||||||
admin.cloneSnapshot(test.getMethodName(), clone);
|
admin.cloneSnapshot(TestMobUtils.getTableName(test), clone);
|
||||||
assertEquals("Should have 3 hlinks per region in MOB area from snapshot clone", 3 * numRegions,
|
assertEquals("Should have 3 hlinks per region in MOB area from snapshot clone", 3 * numRegions,
|
||||||
getNumberOfMobFiles(clone, famStr));
|
getNumberOfMobFiles(clone, famStr));
|
||||||
mobCompact(admin.getDescriptor(clone), familyDescriptor);
|
mobCompact(admin.getDescriptor(clone), familyDescriptor);
|
||||||
|
@ -191,12 +211,12 @@ public class TestMobCompactionWithDefaults {
|
||||||
@Test
|
@Test
|
||||||
public void testMobFileCompactionAfterSnapshotCloneAndFlush()
|
public void testMobFileCompactionAfterSnapshotCloneAndFlush()
|
||||||
throws InterruptedException, IOException {
|
throws InterruptedException, IOException {
|
||||||
final TableName clone = TableName.valueOf(test.getMethodName() + "-clone");
|
final TableName clone = TableName.valueOf(TestMobUtils.getTableName(test) + "-clone");
|
||||||
LOG.info("MOB compaction of cloned snapshot after flush, " + description() + " started");
|
LOG.info("MOB compaction of cloned snapshot after flush, " + description() + " started");
|
||||||
loadAndFlushThreeTimes(rows, table, famStr);
|
loadAndFlushThreeTimes(rows, table, famStr);
|
||||||
LOG.debug("Taking snapshot and cloning table {}", table);
|
LOG.debug("Taking snapshot and cloning table {}", table);
|
||||||
admin.snapshot(test.getMethodName(), table);
|
admin.snapshot(TestMobUtils.getTableName(test), table);
|
||||||
admin.cloneSnapshot(test.getMethodName(), clone);
|
admin.cloneSnapshot(TestMobUtils.getTableName(test), clone);
|
||||||
assertEquals("Should have 3 hlinks per region in MOB area from snapshot clone", 3 * numRegions,
|
assertEquals("Should have 3 hlinks per region in MOB area from snapshot clone", 3 * numRegions,
|
||||||
getNumberOfMobFiles(clone, famStr));
|
getNumberOfMobFiles(clone, famStr));
|
||||||
loadAndFlushThreeTimes(rows, clone, famStr);
|
loadAndFlushThreeTimes(rows, clone, famStr);
|
||||||
|
@ -275,8 +295,11 @@ public class TestMobCompactionWithDefaults {
|
||||||
|
|
||||||
Thread.sleep(minAgeToArchive + 1000);
|
Thread.sleep(minAgeToArchive + 1000);
|
||||||
LOG.info("Cleaning up MOB files");
|
LOG.info("Cleaning up MOB files");
|
||||||
// Cleanup again
|
|
||||||
cleanerChore.cleanupObsoleteMobFiles(conf, table);
|
// run cleaner chore on each RS
|
||||||
|
for (ServerName sn : admin.getRegionServers()) {
|
||||||
|
HTU.getMiniHBaseCluster().getRegionServer(sn).getRSMobFileCleanerChore().chore();
|
||||||
|
}
|
||||||
|
|
||||||
assertEquals("After cleaning, we should have 1 MOB file per region based on size.", numRegions,
|
assertEquals("After cleaning, we should have 1 MOB file per region based on size.", numRegions,
|
||||||
getNumberOfMobFiles(table, family));
|
getNumberOfMobFiles(table, family));
|
||||||
|
|
|
@ -18,6 +18,7 @@
|
||||||
package org.apache.hadoop.hbase.mob;
|
package org.apache.hadoop.hbase.mob;
|
||||||
|
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertFalse;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
@ -32,6 +33,8 @@ import org.apache.hadoop.hbase.HColumnDescriptor;
|
||||||
import org.apache.hadoop.hbase.HTableDescriptor;
|
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||||
import org.apache.hadoop.hbase.TableName;
|
import org.apache.hadoop.hbase.TableName;
|
||||||
import org.apache.hadoop.hbase.client.Admin;
|
import org.apache.hadoop.hbase.client.Admin;
|
||||||
|
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
|
||||||
|
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
|
||||||
import org.apache.hadoop.hbase.client.CompactionState;
|
import org.apache.hadoop.hbase.client.CompactionState;
|
||||||
import org.apache.hadoop.hbase.client.Put;
|
import org.apache.hadoop.hbase.client.Put;
|
||||||
import org.apache.hadoop.hbase.client.Result;
|
import org.apache.hadoop.hbase.client.Result;
|
||||||
|
@ -168,14 +171,40 @@ public class TestMobFileCleanerChore {
|
||||||
|
|
||||||
Thread.sleep(minAgeToArchive + 1000);
|
Thread.sleep(minAgeToArchive + 1000);
|
||||||
LOG.info("Cleaning up MOB files");
|
LOG.info("Cleaning up MOB files");
|
||||||
// Cleanup again
|
// Cleanup
|
||||||
chore.cleanupObsoleteMobFiles(conf, table.getName());
|
chore.cleanupObsoleteMobFiles(conf, table.getName());
|
||||||
|
|
||||||
|
// verify that nothing have happened
|
||||||
num = getNumberOfMobFiles(conf, table.getName(), new String(fam));
|
num = getNumberOfMobFiles(conf, table.getName(), new String(fam));
|
||||||
assertEquals(1, num);
|
assertEquals(4, num);
|
||||||
|
|
||||||
long scanned = scanTable();
|
long scanned = scanTable();
|
||||||
assertEquals(30, scanned);
|
assertEquals(30, scanned);
|
||||||
|
|
||||||
|
// add a MOB file to with a name refering to a non-existing region
|
||||||
|
ColumnFamilyDescriptor familyDescriptor = ColumnFamilyDescriptorBuilder.newBuilder(fam)
|
||||||
|
.setMobEnabled(true).setMobThreshold(mobLen).setMaxVersions(1).build();
|
||||||
|
Path extraMOBFile = MobTestUtil.generateMOBFileForRegion(conf, table.getName(),
|
||||||
|
familyDescriptor, "nonExistentRegion");
|
||||||
|
num = getNumberOfMobFiles(conf, table.getName(), new String(fam));
|
||||||
|
assertEquals(5, num);
|
||||||
|
|
||||||
|
LOG.info("Waiting for {}ms", minAgeToArchive + 1000);
|
||||||
|
|
||||||
|
Thread.sleep(minAgeToArchive + 1000);
|
||||||
|
LOG.info("Cleaning up MOB files");
|
||||||
|
chore.cleanupObsoleteMobFiles(conf, table.getName());
|
||||||
|
|
||||||
|
// check that the extra file got deleted
|
||||||
|
num = getNumberOfMobFiles(conf, table.getName(), new String(fam));
|
||||||
|
assertEquals(4, num);
|
||||||
|
|
||||||
|
FileSystem fs = FileSystem.get(conf);
|
||||||
|
assertFalse(fs.exists(extraMOBFile));
|
||||||
|
|
||||||
|
scanned = scanTable();
|
||||||
|
assertEquals(30, scanned);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private long getNumberOfMobFiles(Configuration conf, TableName tableName, String family)
|
private long getNumberOfMobFiles(Configuration conf, TableName tableName, String family)
|
||||||
|
|
|
@ -26,11 +26,14 @@ import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.Collection;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Optional;
|
import java.util.Optional;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
import java.util.UUID;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.FileStatus;
|
import org.apache.hadoop.fs.FileStatus;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
@ -65,6 +68,7 @@ import org.apache.hadoop.hbase.regionserver.InternalScanner;
|
||||||
import org.apache.hadoop.hbase.regionserver.RegionAsTable;
|
import org.apache.hadoop.hbase.regionserver.RegionAsTable;
|
||||||
import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
|
import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
|
||||||
import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
|
import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory;
|
||||||
import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController;
|
import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController;
|
||||||
import org.apache.hadoop.hbase.security.User;
|
import org.apache.hadoop.hbase.security.User;
|
||||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||||
|
@ -78,12 +82,15 @@ import org.junit.Rule;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.junit.experimental.categories.Category;
|
import org.junit.experimental.categories.Category;
|
||||||
import org.junit.rules.TestName;
|
import org.junit.rules.TestName;
|
||||||
|
import org.junit.runner.RunWith;
|
||||||
|
import org.junit.runners.Parameterized;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Test mob store compaction
|
* Test mob store compaction
|
||||||
*/
|
*/
|
||||||
|
@RunWith(Parameterized.class)
|
||||||
@Category(MediumTests.class)
|
@Category(MediumTests.class)
|
||||||
public class TestMobStoreCompaction {
|
public class TestMobStoreCompaction {
|
||||||
|
|
||||||
|
@ -108,13 +115,30 @@ public class TestMobStoreCompaction {
|
||||||
private final byte[] STARTROW = Bytes.toBytes(START_KEY);
|
private final byte[] STARTROW = Bytes.toBytes(START_KEY);
|
||||||
private int compactionThreshold;
|
private int compactionThreshold;
|
||||||
|
|
||||||
|
private Boolean useFileBasedSFT;
|
||||||
|
|
||||||
|
public TestMobStoreCompaction(Boolean useFileBasedSFT) {
|
||||||
|
this.useFileBasedSFT = useFileBasedSFT;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Parameterized.Parameters
|
||||||
|
public static Collection<Boolean> data() {
|
||||||
|
Boolean[] data = { false, true };
|
||||||
|
return Arrays.asList(data);
|
||||||
|
}
|
||||||
|
|
||||||
private void init(Configuration conf, long mobThreshold) throws Exception {
|
private void init(Configuration conf, long mobThreshold) throws Exception {
|
||||||
|
if (useFileBasedSFT) {
|
||||||
|
conf.set(StoreFileTrackerFactory.TRACKER_IMPL,
|
||||||
|
"org.apache.hadoop.hbase.regionserver.storefiletracker.FileBasedStoreFileTracker");
|
||||||
|
}
|
||||||
|
|
||||||
this.conf = conf;
|
this.conf = conf;
|
||||||
this.mobCellThreshold = mobThreshold;
|
this.mobCellThreshold = mobThreshold;
|
||||||
HBaseTestingUtility UTIL = new HBaseTestingUtility(conf);
|
HBaseTestingUtility UTIL = new HBaseTestingUtility(conf);
|
||||||
|
|
||||||
compactionThreshold = conf.getInt("hbase.hstore.compactionThreshold", 3);
|
compactionThreshold = conf.getInt("hbase.hstore.compactionThreshold", 3);
|
||||||
htd = UTIL.createTableDescriptor(name.getMethodName());
|
htd = UTIL.createTableDescriptor(TestMobUtils.getTableName(name));
|
||||||
hcd = new HColumnDescriptor(COLUMN_FAMILY);
|
hcd = new HColumnDescriptor(COLUMN_FAMILY);
|
||||||
hcd.setMobEnabled(true);
|
hcd.setMobEnabled(true);
|
||||||
hcd.setMobThreshold(mobThreshold);
|
hcd.setMobThreshold(mobThreshold);
|
||||||
|
@ -227,7 +251,7 @@ public class TestMobStoreCompaction {
|
||||||
Path basedir = new Path(hbaseRootDir, htd.getNameAsString());
|
Path basedir = new Path(hbaseRootDir, htd.getNameAsString());
|
||||||
List<Pair<byte[], String>> hfiles = new ArrayList<>(1);
|
List<Pair<byte[], String>> hfiles = new ArrayList<>(1);
|
||||||
for (int i = 0; i < compactionThreshold; i++) {
|
for (int i = 0; i < compactionThreshold; i++) {
|
||||||
Path hpath = new Path(basedir, "hfile" + i);
|
Path hpath = new Path(basedir, UUID.randomUUID().toString().replace("-", ""));
|
||||||
hfiles.add(Pair.newPair(COLUMN_FAMILY, hpath.toString()));
|
hfiles.add(Pair.newPair(COLUMN_FAMILY, hpath.toString()));
|
||||||
createHFile(hpath, i, dummyData);
|
createHFile(hpath, i, dummyData);
|
||||||
}
|
}
|
||||||
|
|
|
@ -27,6 +27,7 @@ import org.apache.hadoop.hbase.util.Bytes;
|
||||||
import org.junit.ClassRule;
|
import org.junit.ClassRule;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.junit.experimental.categories.Category;
|
import org.junit.experimental.categories.Category;
|
||||||
|
import org.junit.rules.TestName;
|
||||||
|
|
||||||
import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableSet;
|
import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableSet;
|
||||||
import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableSetMultimap;
|
import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableSetMultimap;
|
||||||
|
@ -89,4 +90,8 @@ public class TestMobUtils {
|
||||||
assertTrue(testTable3Refs.contains("file3a"));
|
assertTrue(testTable3Refs.contains("file3a"));
|
||||||
assertTrue(testTable3Refs.contains("file3b"));
|
assertTrue(testTable3Refs.contains("file3b"));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static String getTableName(TestName test) {
|
||||||
|
return test.getMethodName().replace("[", "-").replace("]", "");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,263 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.mob;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.FileStatus;
|
||||||
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||||
|
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||||
|
import org.apache.hadoop.hbase.ServerName;
|
||||||
|
import org.apache.hadoop.hbase.TableName;
|
||||||
|
import org.apache.hadoop.hbase.client.Admin;
|
||||||
|
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
|
||||||
|
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
|
||||||
|
import org.apache.hadoop.hbase.client.CompactionState;
|
||||||
|
import org.apache.hadoop.hbase.client.Put;
|
||||||
|
import org.apache.hadoop.hbase.client.RegionInfo;
|
||||||
|
import org.apache.hadoop.hbase.client.Result;
|
||||||
|
import org.apache.hadoop.hbase.client.ResultScanner;
|
||||||
|
import org.apache.hadoop.hbase.client.Table;
|
||||||
|
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
|
||||||
|
import org.apache.hadoop.hbase.master.cleaner.TimeToLiveHFileCleaner;
|
||||||
|
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||||
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
import org.junit.After;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.ClassRule;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.junit.experimental.categories.Category;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Mob file cleaner chore test. 1. Creates MOB table 2. Load MOB data and flushes it N times 3. Runs
|
||||||
|
* major MOB compaction 4. Verifies that number of MOB files in a mob directory is N+1 5. Waits for
|
||||||
|
* a period of time larger than minimum age to archive 6. Runs Mob cleaner chore 7 Verifies that
|
||||||
|
* every old MOB file referenced from current RS was archived
|
||||||
|
*/
|
||||||
|
@Category(MediumTests.class)
|
||||||
|
public class TestRSMobFileCleanerChore {
|
||||||
|
private static final Logger LOG = LoggerFactory.getLogger(TestRSMobFileCleanerChore.class);
|
||||||
|
@ClassRule
|
||||||
|
public static final HBaseClassTestRule CLASS_RULE =
|
||||||
|
HBaseClassTestRule.forClass(TestRSMobFileCleanerChore.class);
|
||||||
|
|
||||||
|
private HBaseTestingUtility HTU;
|
||||||
|
|
||||||
|
private final static String famStr = "f1";
|
||||||
|
private final static byte[] fam = Bytes.toBytes(famStr);
|
||||||
|
private final static byte[] qualifier = Bytes.toBytes("q1");
|
||||||
|
private final static long mobLen = 10;
|
||||||
|
private final static byte[] mobVal = Bytes
|
||||||
|
.toBytes("01234567890123456789012345678901234567890123456789012345678901234567890123456789");
|
||||||
|
|
||||||
|
private Configuration conf;
|
||||||
|
private TableDescriptorBuilder.ModifyableTableDescriptor tableDescriptor;
|
||||||
|
private ColumnFamilyDescriptor familyDescriptor;
|
||||||
|
private Admin admin;
|
||||||
|
private Table table = null;
|
||||||
|
private RSMobFileCleanerChore chore;
|
||||||
|
private long minAgeToArchive = 10000;
|
||||||
|
|
||||||
|
public TestRSMobFileCleanerChore() {
|
||||||
|
}
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void setUp() throws Exception {
|
||||||
|
HTU = new HBaseTestingUtility();
|
||||||
|
conf = HTU.getConfiguration();
|
||||||
|
|
||||||
|
initConf();
|
||||||
|
|
||||||
|
HTU.startMiniCluster();
|
||||||
|
admin = HTU.getAdmin();
|
||||||
|
familyDescriptor = ColumnFamilyDescriptorBuilder.newBuilder(fam).setMobEnabled(true)
|
||||||
|
.setMobThreshold(mobLen).setMaxVersions(1).build();
|
||||||
|
tableDescriptor =
|
||||||
|
HTU.createModifyableTableDescriptor("testMobCompactTable").setColumnFamily(familyDescriptor);
|
||||||
|
table = HTU.createTable(tableDescriptor, Bytes.toByteArrays("1"));
|
||||||
|
}
|
||||||
|
|
||||||
|
private void initConf() {
|
||||||
|
|
||||||
|
conf.setInt("hfile.format.version", 3);
|
||||||
|
conf.setLong(TimeToLiveHFileCleaner.TTL_CONF_KEY, 0);
|
||||||
|
conf.setInt("hbase.client.retries.number", 100);
|
||||||
|
conf.setInt("hbase.hregion.max.filesize", 200000000);
|
||||||
|
conf.setInt("hbase.hregion.memstore.flush.size", 800000);
|
||||||
|
conf.setInt("hbase.hstore.blockingStoreFiles", 150);
|
||||||
|
conf.setInt("hbase.hstore.compaction.throughput.lower.bound", 52428800);
|
||||||
|
conf.setInt("hbase.hstore.compaction.throughput.higher.bound", 2 * 52428800);
|
||||||
|
// conf.set(MobStoreEngine.DEFAULT_MOB_COMPACTOR_CLASS_KEY,
|
||||||
|
// FaultyMobStoreCompactor.class.getName());
|
||||||
|
// Disable automatic MOB compaction
|
||||||
|
conf.setLong(MobConstants.MOB_COMPACTION_CHORE_PERIOD, 0);
|
||||||
|
// Disable automatic MOB file cleaner chore
|
||||||
|
conf.setLong(MobConstants.MOB_CLEANER_PERIOD, 0);
|
||||||
|
// Set minimum age to archive to 10 sec
|
||||||
|
conf.setLong(MobConstants.MIN_AGE_TO_ARCHIVE_KEY, minAgeToArchive);
|
||||||
|
// Set compacted file discharger interval to a half minAgeToArchive
|
||||||
|
conf.setLong("hbase.hfile.compaction.discharger.interval", minAgeToArchive / 2);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void loadData(int start, int num) {
|
||||||
|
try {
|
||||||
|
|
||||||
|
for (int i = 0; i < num; i++) {
|
||||||
|
Put p = new Put(Bytes.toBytes(start + i));
|
||||||
|
p.addColumn(fam, qualifier, mobVal);
|
||||||
|
table.put(p);
|
||||||
|
}
|
||||||
|
admin.flush(table.getName());
|
||||||
|
} catch (Exception e) {
|
||||||
|
LOG.error("MOB file cleaner chore test FAILED", e);
|
||||||
|
assertTrue(false);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@After
|
||||||
|
public void tearDown() throws Exception {
|
||||||
|
admin.disableTable(tableDescriptor.getTableName());
|
||||||
|
admin.deleteTable(tableDescriptor.getTableName());
|
||||||
|
HTU.shutdownMiniCluster();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testMobFileCleanerChore() throws InterruptedException, IOException {
|
||||||
|
loadData(0, 10);
|
||||||
|
loadData(10, 10);
|
||||||
|
// loadData(20, 10);
|
||||||
|
long num = getNumberOfMobFiles(conf, table.getName(), new String(fam));
|
||||||
|
assertEquals(2, num);
|
||||||
|
// Major compact
|
||||||
|
admin.majorCompact(tableDescriptor.getTableName(), fam);
|
||||||
|
// wait until compaction is complete
|
||||||
|
while (admin.getCompactionState(tableDescriptor.getTableName()) != CompactionState.NONE) {
|
||||||
|
Thread.sleep(100);
|
||||||
|
}
|
||||||
|
|
||||||
|
num = getNumberOfMobFiles(conf, table.getName(), new String(fam));
|
||||||
|
assertEquals(3, num);
|
||||||
|
// We have guarantee, that compcated file discharger will run during this pause
|
||||||
|
// because it has interval less than this wait time
|
||||||
|
LOG.info("Waiting for {}ms", minAgeToArchive + 1000);
|
||||||
|
|
||||||
|
Thread.sleep(minAgeToArchive + 1000);
|
||||||
|
LOG.info("Cleaning up MOB files");
|
||||||
|
|
||||||
|
ServerName serverUsed = null;
|
||||||
|
List<RegionInfo> serverRegions = null;
|
||||||
|
for (ServerName sn : admin.getRegionServers()) {
|
||||||
|
serverRegions = admin.getRegions(sn);
|
||||||
|
if (serverRegions != null && serverRegions.size() > 0) {
|
||||||
|
// filtering out non test table regions
|
||||||
|
serverRegions = serverRegions.stream().filter(r -> r.getTable() == table.getName())
|
||||||
|
.collect(Collectors.toList());
|
||||||
|
// if such one is found use this rs
|
||||||
|
if (serverRegions.size() > 0) {
|
||||||
|
serverUsed = sn;
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
chore = HTU.getMiniHBaseCluster().getRegionServer(serverUsed).getRSMobFileCleanerChore();
|
||||||
|
|
||||||
|
chore.chore();
|
||||||
|
|
||||||
|
num = getNumberOfMobFiles(conf, table.getName(), new String(fam));
|
||||||
|
assertEquals(3 - serverRegions.size(), num);
|
||||||
|
|
||||||
|
long scanned = scanTable();
|
||||||
|
assertEquals(20, scanned);
|
||||||
|
|
||||||
|
// creating a MOB file not referenced from the current RS
|
||||||
|
Path extraMOBFile = MobTestUtil.generateMOBFileForRegion(conf, table.getName(),
|
||||||
|
familyDescriptor, "nonExistentRegion");
|
||||||
|
|
||||||
|
// verifying the new MOBfile is added
|
||||||
|
num = getNumberOfMobFiles(conf, table.getName(), new String(fam));
|
||||||
|
assertEquals(4 - serverRegions.size(), num);
|
||||||
|
|
||||||
|
FileSystem fs = FileSystem.get(conf);
|
||||||
|
assertTrue(fs.exists(extraMOBFile));
|
||||||
|
|
||||||
|
LOG.info("Waiting for {}ms", minAgeToArchive + 1000);
|
||||||
|
|
||||||
|
Thread.sleep(minAgeToArchive + 1000);
|
||||||
|
LOG.info("Cleaning up MOB files");
|
||||||
|
|
||||||
|
// running chore again
|
||||||
|
chore.chore();
|
||||||
|
|
||||||
|
// the chore should only archive old MOB files that were referenced from the current RS
|
||||||
|
// the unrelated MOB file is still there
|
||||||
|
num = getNumberOfMobFiles(conf, table.getName(), new String(fam));
|
||||||
|
assertEquals(4 - serverRegions.size(), num);
|
||||||
|
|
||||||
|
assertTrue(fs.exists(extraMOBFile));
|
||||||
|
|
||||||
|
scanned = scanTable();
|
||||||
|
assertEquals(20, scanned);
|
||||||
|
}
|
||||||
|
|
||||||
|
private long getNumberOfMobFiles(Configuration conf, TableName tableName, String family)
|
||||||
|
throws IOException {
|
||||||
|
FileSystem fs = FileSystem.get(conf);
|
||||||
|
Path dir = MobUtils.getMobFamilyPath(conf, tableName, family);
|
||||||
|
FileStatus[] stat = fs.listStatus(dir);
|
||||||
|
for (FileStatus st : stat) {
|
||||||
|
LOG.debug("DDDD MOB Directory content: {} size={}", st.getPath(), st.getLen());
|
||||||
|
}
|
||||||
|
LOG.debug("MOB Directory content total files: {}", stat.length);
|
||||||
|
|
||||||
|
return stat.length;
|
||||||
|
}
|
||||||
|
|
||||||
|
private long scanTable() {
|
||||||
|
try {
|
||||||
|
|
||||||
|
Result result;
|
||||||
|
ResultScanner scanner = table.getScanner(fam);
|
||||||
|
long counter = 0;
|
||||||
|
while ((result = scanner.next()) != null) {
|
||||||
|
assertTrue(Arrays.equals(result.getValue(fam, qualifier), mobVal));
|
||||||
|
counter++;
|
||||||
|
}
|
||||||
|
return counter;
|
||||||
|
} catch (Exception e) {
|
||||||
|
e.printStackTrace();
|
||||||
|
LOG.error("MOB file cleaner chore test FAILED");
|
||||||
|
if (HTU != null) {
|
||||||
|
assertTrue(false);
|
||||||
|
} else {
|
||||||
|
System.exit(-1);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue