From 5cf728da5a0fea5fd94bb31f06c9a43158ace6b4 Mon Sep 17 00:00:00 2001 From: BukrosSzabolcs Date: Tue, 21 Jun 2022 11:18:55 +0200 Subject: [PATCH] HBASE-26969:Eliminate MOB renames when SFT is enabled (#4418) Signed-off-by: Wellington Chevreuil --- .../hbase/mob/DefaultMobStoreCompactor.java | 29 +- .../hbase/mob/DefaultMobStoreFlusher.java | 16 +- .../hadoop/hbase/mob/MobFileCleanerChore.java | 44 ++- .../org/apache/hadoop/hbase/mob/MobUtils.java | 31 +- .../hbase/mob/RSMobFileCleanerChore.java | 272 ++++++++++++++++++ .../hadoop/hbase/regionserver/HMobStore.java | 39 ++- .../hbase/regionserver/HRegionServer.java | 15 + .../hadoop/hbase/regionserver/HStore.java | 2 +- .../regionserver/compactions/Compactor.java | 23 +- .../hbase/mob/FaultyMobStoreCompactor.java | 6 +- .../apache/hadoop/hbase/mob/MobTestUtil.java | 22 ++ .../hbase/mob/TestDefaultMobStoreFlusher.java | 38 ++- .../hbase/mob/TestMobCompactionOptMode.java | 11 +- .../TestMobCompactionOptRegionBatchMode.java | 14 +- ...stMobCompactionRegularRegionBatchMode.java | 13 +- .../mob/TestMobCompactionWithDefaults.java | 57 ++-- .../hbase/mob/TestMobFileCleanerChore.java | 29 +- .../hbase/mob/TestMobStoreCompaction.java | 29 +- .../apache/hadoop/hbase/mob/TestMobUtils.java | 5 + .../hbase/mob/TestRSMobFileCleanerChore.java | 263 +++++++++++++++++ 20 files changed, 860 insertions(+), 98 deletions(-) create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/mob/RSMobFileCleanerChore.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestRSMobFileCleanerChore.java diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreCompactor.java index 57a991e45f3..f568af0b19a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreCompactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreCompactor.java @@ -293,20 +293,20 @@ public class DefaultMobStoreCompactor extends DefaultCompactor { * @param cleanSeqId When true, remove seqId(used to be mvcc) value which is <= * smallestReadPoint * @param throughputController The compaction throughput controller. - * @param major Is a major compaction. - * @param numofFilesToCompact the number of files to compact + * @param request compaction request. * @param progress Progress reporter. * @return Whether compaction ended; false if it was interrupted for any reason. */ @Override protected boolean performCompaction(FileDetails fd, InternalScanner scanner, CellSink writer, long smallestReadPoint, boolean cleanSeqId, ThroughputController throughputController, - boolean major, int numofFilesToCompact, CompactionProgress progress) throws IOException { + CompactionRequestImpl request, CompactionProgress progress) throws IOException { long bytesWrittenProgressForLog = 0; long bytesWrittenProgressForShippedCall = 0; // Clear old mob references mobRefSet.get().clear(); boolean isUserRequest = userRequest.get(); + boolean major = request.isAllFiles(); boolean compactMOBs = major && isUserRequest; boolean discardMobMiss = conf.getBoolean(MobConstants.MOB_UNSAFE_DISCARD_MISS_KEY, MobConstants.DEFAULT_MOB_DISCARD_MISS); @@ -350,12 +350,12 @@ public class DefaultMobStoreCompactor extends DefaultCompactor { throughputController.start(compactionName); KeyValueScanner kvs = (scanner instanceof KeyValueScanner) ? (KeyValueScanner) scanner : null; long shippedCallSizeLimit = - (long) numofFilesToCompact * this.store.getColumnFamilyDescriptor().getBlocksize(); + (long) request.getFiles().size() * this.store.getColumnFamilyDescriptor().getBlocksize(); Cell mobCell = null; try { - mobFileWriter = newMobWriter(fd, major); + mobFileWriter = newMobWriter(fd, major, request.getWriterCreationTracker()); fileName = Bytes.toBytes(mobFileWriter.getPath().getName()); do { @@ -435,7 +435,7 @@ public class DefaultMobStoreCompactor extends DefaultCompactor { LOG.debug("Closing output MOB File, length={} file={}, store={}", len, mobFileWriter.getPath().getName(), getStoreInfo()); commitOrAbortMobWriter(mobFileWriter, fd.maxSeqId, mobCells, major); - mobFileWriter = newMobWriter(fd, major); + mobFileWriter = newMobWriter(fd, major, request.getWriterCreationTracker()); fileName = Bytes.toBytes(mobFileWriter.getPath().getName()); mobCells = 0; } @@ -479,7 +479,7 @@ public class DefaultMobStoreCompactor extends DefaultCompactor { long len = mobFileWriter.getPos(); if (len > maxMobFileSize) { commitOrAbortMobWriter(mobFileWriter, fd.maxSeqId, mobCells, major); - mobFileWriter = newMobWriter(fd, major); + mobFileWriter = newMobWriter(fd, major, request.getWriterCreationTracker()); fileName = Bytes.toBytes(mobFileWriter.getPath().getName()); mobCells = 0; } @@ -531,7 +531,7 @@ public class DefaultMobStoreCompactor extends DefaultCompactor { long len = mobFileWriter.getPos(); if (len > maxMobFileSize) { commitOrAbortMobWriter(mobFileWriter, fd.maxSeqId, mobCells, major); - mobFileWriter = newMobWriter(fd, major); + mobFileWriter = newMobWriter(fd, major, request.getWriterCreationTracker()); fileName = Bytes.toBytes(mobFileWriter.getPath().getName()); mobCells = 0; } @@ -617,11 +617,16 @@ public class DefaultMobStoreCompactor extends DefaultCompactor { } } - private StoreFileWriter newMobWriter(FileDetails fd, boolean major) throws IOException { + private StoreFileWriter newMobWriter(FileDetails fd, boolean major, + Consumer writerCreationTracker) throws IOException { try { - StoreFileWriter mobFileWriter = mobStore.createWriterInTmp(new Date(fd.latestPutTs), - fd.maxKeyCount, major ? majorCompactionCompression : minorCompactionCompression, - store.getRegionInfo().getStartKey(), true); + StoreFileWriter mobFileWriter = mobStore.getStoreEngine().requireWritingToTmpDirFirst() + ? mobStore.createWriterInTmp(new Date(fd.latestPutTs), fd.maxKeyCount, + major ? majorCompactionCompression : minorCompactionCompression, + store.getRegionInfo().getStartKey(), true) + : mobStore.createWriter(new Date(fd.latestPutTs), fd.maxKeyCount, + major ? majorCompactionCompression : minorCompactionCompression, + store.getRegionInfo().getStartKey(), true, writerCreationTracker); LOG.debug("New MOB writer created={} store={}", mobFileWriter.getPath().getName(), getStoreInfo()); // Add reference we get for compact MOB diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreFlusher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreFlusher.java index a7f2ecdf242..6df17e58e22 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreFlusher.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreFlusher.java @@ -127,7 +127,8 @@ public class DefaultMobStoreFlusher extends DefaultStoreFlusher { try { // It's a mob store, flush the cells in a mob way. This is the difference of flushing // between a normal and a mob store. - performMobFlush(snapshot, cacheFlushId, scanner, writer, status, throughputController); + performMobFlush(snapshot, cacheFlushId, scanner, writer, status, throughputController, + writerCreationTracker); } catch (IOException ioe) { e = ioe; // throw the exception out @@ -171,16 +172,21 @@ public class DefaultMobStoreFlusher extends DefaultStoreFlusher { */ protected void performMobFlush(MemStoreSnapshot snapshot, long cacheFlushId, InternalScanner scanner, StoreFileWriter writer, MonitoredTask status, - ThroughputController throughputController) throws IOException { + ThroughputController throughputController, Consumer writerCreationTracker) + throws IOException { StoreFileWriter mobFileWriter = null; int compactionKVMax = conf.getInt(HConstants.COMPACTION_KV_MAX, HConstants.COMPACTION_KV_MAX_DEFAULT); long mobCount = 0; long mobSize = 0; long time = snapshot.getTimeRangeTracker().getMax(); - mobFileWriter = mobStore.createWriterInTmp(new Date(time), snapshot.getCellsCount(), - store.getColumnFamilyDescriptor().getCompressionType(), store.getRegionInfo().getStartKey(), - false); + mobFileWriter = mobStore.getStoreEngine().requireWritingToTmpDirFirst() + ? mobStore.createWriterInTmp(new Date(time), snapshot.getCellsCount(), + store.getColumnFamilyDescriptor().getCompressionType(), store.getRegionInfo().getStartKey(), + false) + : mobStore.createWriter(new Date(time), snapshot.getCellsCount(), + store.getColumnFamilyDescriptor().getCompressionType(), store.getRegionInfo().getStartKey(), + false, writerCreationTracker); // the target path is {tableName}/.mob/{cfName}/mobFiles // the relative path is mobFiles byte[] fileName = Bytes.toBytes(mobFileWriter.getPath().getName()); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobFileCleanerChore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobFileCleanerChore.java index 1258a17a8eb..2c78c6f5ac7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobFileCleanerChore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobFileCleanerChore.java @@ -160,14 +160,15 @@ public class MobFileCleanerChore extends ScheduledChore { maxCreationTimeToArchive, table); } + FileSystem fs = FileSystem.get(conf); + Set regionNames = new HashSet<>(); Path rootDir = CommonFSUtils.getRootDir(conf); Path tableDir = CommonFSUtils.getTableDir(rootDir, table); - // How safe is this call? - List regionDirs = FSUtils.getRegionDirs(FileSystem.get(conf), tableDir); + List regionDirs = FSUtils.getRegionDirs(fs, tableDir); Set allActiveMobFileName = new HashSet(); - FileSystem fs = FileSystem.get(conf); for (Path regionPath : regionDirs) { + regionNames.add(regionPath.getName()); for (ColumnFamilyDescriptor hcd : list) { String family = hcd.getNameAsString(); Path storePath = new Path(regionPath, family); @@ -195,13 +196,26 @@ public class MobFileCleanerChore extends ScheduledChore { for (Path pp : storeFiles) { currentPath = pp; LOG.trace("Store file: {}", pp); - HStoreFile sf = - new HStoreFile(fs, pp, conf, CacheConfig.DISABLED, BloomType.NONE, true); - sf.initReader(); - byte[] mobRefData = sf.getMetadataValue(HStoreFile.MOB_FILE_REFS); - byte[] bulkloadMarkerData = sf.getMetadataValue(HStoreFile.BULKLOAD_TASK_KEY); - // close store file to avoid memory leaks - sf.closeStoreFile(true); + HStoreFile sf = null; + byte[] mobRefData = null; + byte[] bulkloadMarkerData = null; + try { + sf = new HStoreFile(fs, pp, conf, CacheConfig.DISABLED, BloomType.NONE, true); + sf.initReader(); + mobRefData = sf.getMetadataValue(HStoreFile.MOB_FILE_REFS); + bulkloadMarkerData = sf.getMetadataValue(HStoreFile.BULKLOAD_TASK_KEY); + // close store file to avoid memory leaks + sf.closeStoreFile(true); + } catch (IOException ex) { + // When FileBased SFT is active the store dir can contain corrupted or incomplete + // files. So read errors are expected. We just skip these files. + if (ex instanceof FileNotFoundException) { + throw ex; + } + LOG.debug("Failed to get mob data from file: {} due to error.", pp.toString(), + ex); + continue; + } if (mobRefData == null) { if (bulkloadMarkerData == null) { LOG.warn("Found old store file with no MOB_FILE_REFS: {} - " @@ -256,9 +270,11 @@ public class MobFileCleanerChore extends ScheduledChore { while (rit.hasNext()) { LocatedFileStatus lfs = rit.next(); Path p = lfs.getPath(); - if (!allActiveMobFileName.contains(p.getName())) { - // MOB is not in a list of active references, but it can be too - // fresh, skip it in this case + String[] mobParts = p.getName().split("_"); + String regionName = mobParts[mobParts.length - 1]; + + if (!regionNames.contains(regionName)) { + // MOB belonged to a region no longer hosted long creationTime = fs.getFileStatus(p).getModificationTime(); if (creationTime < maxCreationTimeToArchive) { LOG.trace("Archiving MOB file {} creation time={}", p, @@ -269,7 +285,7 @@ public class MobFileCleanerChore extends ScheduledChore { fs.getFileStatus(p).getModificationTime()); } } else { - LOG.trace("Keeping active MOB file: {}", p); + LOG.trace("Keeping MOB file with existing region: {}", p); } } LOG.info(" MOB Cleaner found {} files to archive for table={} family={}", toArchive.size(), diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobUtils.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobUtils.java index 5e2ee9eb411..43cf4255235 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobUtils.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobUtils.java @@ -33,6 +33,7 @@ import java.util.Date; import java.util.List; import java.util.Optional; import java.util.UUID; +import java.util.function.Consumer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; @@ -584,6 +585,33 @@ public final class MobUtils { CacheConfig cacheConfig, Encryption.Context cryptoContext, ChecksumType checksumType, int bytesPerChecksum, int blocksize, BloomType bloomType, boolean isCompaction) throws IOException { + return createWriter(conf, fs, family, path, maxKeyCount, compression, cacheConfig, + cryptoContext, checksumType, bytesPerChecksum, blocksize, bloomType, isCompaction, null); + } + + /** + * Creates a writer for the mob file in temp directory. + * @param conf The current configuration. + * @param fs The current file system. + * @param family The descriptor of the current column family. + * @param path The path for a temp directory. + * @param maxKeyCount The key count. + * @param compression The compression algorithm. + * @param cacheConfig The current cache config. + * @param cryptoContext The encryption context. + * @param checksumType The checksum type. + * @param bytesPerChecksum The bytes per checksum. + * @param blocksize The HFile block size. + * @param bloomType The bloom filter type. + * @param isCompaction If the writer is used in compaction. + * @param writerCreationTracker to track the current writer in the store + * @return The writer for the mob file. + */ + public static StoreFileWriter createWriter(Configuration conf, FileSystem fs, + ColumnFamilyDescriptor family, Path path, long maxKeyCount, Compression.Algorithm compression, + CacheConfig cacheConfig, Encryption.Context cryptoContext, ChecksumType checksumType, + int bytesPerChecksum, int blocksize, BloomType bloomType, boolean isCompaction, + Consumer writerCreationTracker) throws IOException { if (compression == null) { compression = HFile.DEFAULT_COMPRESSION_ALGORITHM; } @@ -602,7 +630,8 @@ public final class MobUtils { .withCreateTime(EnvironmentEdgeManager.currentTime()).build(); StoreFileWriter w = new StoreFileWriter.Builder(conf, writerCacheConf, fs).withFilePath(path) - .withBloomType(bloomType).withMaxKeyCount(maxKeyCount).withFileContext(hFileContext).build(); + .withBloomType(bloomType).withMaxKeyCount(maxKeyCount).withFileContext(hFileContext) + .withWriterCreationTracker(writerCreationTracker).build(); return w; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/RSMobFileCleanerChore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/RSMobFileCleanerChore.java new file mode 100644 index 00000000000..06e34988733 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/RSMobFileCleanerChore.java @@ -0,0 +1,272 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mob; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocatedFileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RemoteIterator; +import org.apache.hadoop.hbase.ScheduledChore; +import org.apache.hadoop.hbase.TableDescriptors; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.backup.HFileArchiver; +import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; +import org.apache.hadoop.hbase.client.TableDescriptor; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.regionserver.HStore; +import org.apache.hadoop.hbase.regionserver.HStoreFile; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.CommonFSUtils; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hbase.thirdparty.com.google.common.collect.SetMultimap; + +/** + * The class RSMobFileCleanerChore for running cleaner regularly to remove the obsolete (files which + * have no active references to) mob files that were referenced from the current RS. + */ +@InterfaceAudience.Private +public class RSMobFileCleanerChore extends ScheduledChore { + + private static final Logger LOG = LoggerFactory.getLogger(RSMobFileCleanerChore.class); + private final HRegionServer rs; + + public RSMobFileCleanerChore(HRegionServer rs) { + super(rs.getServerName() + "-MobFileCleanerChore", rs, + rs.getConfiguration().getInt(MobConstants.MOB_CLEANER_PERIOD, + MobConstants.DEFAULT_MOB_CLEANER_PERIOD), + Math.round(rs.getConfiguration().getInt(MobConstants.MOB_CLEANER_PERIOD, + MobConstants.DEFAULT_MOB_CLEANER_PERIOD) + * ((ThreadLocalRandom.current().nextDouble() + 0.5D))), + TimeUnit.SECONDS); + // to prevent a load spike on the fs the initial delay is modified by +/- 50% + this.rs = rs; + } + + public RSMobFileCleanerChore() { + this.rs = null; + } + + @Override + protected void chore() { + + long minAgeToArchive = rs.getConfiguration().getLong(MobConstants.MIN_AGE_TO_ARCHIVE_KEY, + MobConstants.DEFAULT_MIN_AGE_TO_ARCHIVE); + // We check only those MOB files, which creation time is less + // than maxCreationTimeToArchive. This is a current time - 1h. 1 hour gap + // gives us full confidence that all corresponding store files will + // exist at the time cleaning procedure begins and will be examined. + // So, if MOB file creation time is greater than this maxTimeToArchive, + // this will be skipped and won't be archived. + long maxCreationTimeToArchive = EnvironmentEdgeManager.currentTime() - minAgeToArchive; + + TableDescriptors htds = rs.getTableDescriptors(); + try { + FileSystem fs = FileSystem.get(rs.getConfiguration()); + + Map map = null; + try { + map = htds.getAll(); + } catch (IOException e) { + LOG.error("MobFileCleanerChore failed", e); + return; + } + Map>> referencedMOBs = new HashMap<>(); + for (TableDescriptor htd : map.values()) { + // Now clean obsolete files for a table + LOG.info("Cleaning obsolete MOB files from table={}", htd.getTableName()); + List list = MobUtils.getMobColumnFamilies(htd); + List regions = rs.getRegions(htd.getTableName()); + for (HRegion region : regions) { + for (ColumnFamilyDescriptor hcd : list) { + HStore store = region.getStore(hcd.getName()); + Collection sfs = store.getStorefiles(); + Set regionMobs = new HashSet(); + Path currentPath = null; + try { + // collectinng referenced MOBs + for (HStoreFile sf : sfs) { + currentPath = sf.getPath(); + sf.initReader(); + byte[] mobRefData = sf.getMetadataValue(HStoreFile.MOB_FILE_REFS); + byte[] bulkloadMarkerData = sf.getMetadataValue(HStoreFile.BULKLOAD_TASK_KEY); + // close store file to avoid memory leaks + sf.closeStoreFile(true); + if (mobRefData == null) { + if (bulkloadMarkerData == null) { + LOG.warn( + "Found old store file with no MOB_FILE_REFS: {} - " + + "can not proceed until all old files will be MOB-compacted.", + currentPath); + return; + } else { + LOG.debug("Skipping file without MOB references (bulkloaded file):{}", + currentPath); + continue; + } + } + // file may or may not have MOB references, but was created by the distributed + // mob compaction code. + try { + SetMultimap mobs = + MobUtils.deserializeMobFileRefs(mobRefData).build(); + LOG.debug("Found {} mob references for store={}", mobs.size(), sf); + LOG.trace("Specific mob references found for store={} : {}", sf, mobs); + regionMobs.addAll(mobs.values()); + } catch (RuntimeException exception) { + throw new IOException("failure getting mob references for hfile " + sf, + exception); + } + } + // collecting files, MOB included currently being written + regionMobs.addAll(store.getStoreFilesBeingWritten().stream() + .map(path -> path.getName()).collect(Collectors.toList())); + + referencedMOBs + .computeIfAbsent(hcd.getNameAsString(), cf -> new HashMap>()) + .computeIfAbsent(region.getRegionInfo().getEncodedName(), name -> new ArrayList<>()) + .addAll(regionMobs); + + } catch (FileNotFoundException e) { + LOG.warn( + "Missing file:{} Starting MOB cleaning cycle from the beginning" + " due to error", + currentPath, e); + regionMobs.clear(); + continue; + } catch (IOException e) { + LOG.error("Failed to clean the obsolete mob files for table={}", + htd.getTableName().getNameAsString(), e); + } + } + } + + if (LOG.isDebugEnabled()) { + LOG.debug("Found: {} active mob refs for table={}", + referencedMOBs.values().stream().map(inner -> inner.values()) + .flatMap(lists -> lists.stream()).mapToInt(lists -> lists.size()).sum(), + htd.getTableName().getNameAsString()); + } + if (LOG.isTraceEnabled()) { + referencedMOBs.values().stream().forEach(innerMap -> innerMap.values().stream() + .forEach(mobFileList -> mobFileList.stream().forEach(LOG::trace))); + } + + // collect regions referencing MOB files belonging to the current rs + Set regionsCovered = new HashSet<>(); + referencedMOBs.values().stream() + .forEach(regionMap -> regionsCovered.addAll(regionMap.keySet())); + + for (ColumnFamilyDescriptor hcd : list) { + List toArchive = new ArrayList(); + String family = hcd.getNameAsString(); + Path dir = MobUtils.getMobFamilyPath(rs.getConfiguration(), htd.getTableName(), family); + RemoteIterator rit = fs.listLocatedStatus(dir); + while (rit.hasNext()) { + LocatedFileStatus lfs = rit.next(); + Path p = lfs.getPath(); + String[] mobParts = p.getName().split("_"); + String regionName = mobParts[mobParts.length - 1]; + + // skip MOB files not belonging to a region assigned to the current rs + if (!regionsCovered.contains(regionName)) { + LOG.trace("MOB file does not belong to current rs: {}", p); + continue; + } + + // check active or actively written mob files + Map> cfMobs = referencedMOBs.get(hcd.getNameAsString()); + if ( + cfMobs != null && cfMobs.get(regionName) != null + && cfMobs.get(regionName).contains(p.getName()) + ) { + LOG.trace("Keeping active MOB file: {}", p); + continue; + } + + // MOB is not in a list of active references, but it can be too + // fresh, skip it in this case + long creationTime = fs.getFileStatus(p).getModificationTime(); + if (creationTime < maxCreationTimeToArchive) { + LOG.trace("Archiving MOB file {} creation time={}", p, + (fs.getFileStatus(p).getModificationTime())); + toArchive.add(p); + } else { + LOG.trace("Skipping fresh file: {}. Creation time={}", p, + fs.getFileStatus(p).getModificationTime()); + } + + } + LOG.info(" MOB Cleaner found {} files to archive for table={} family={}", + toArchive.size(), htd.getTableName().getNameAsString(), family); + archiveMobFiles(rs.getConfiguration(), htd.getTableName(), family.getBytes(), toArchive); + LOG.info(" MOB Cleaner archived {} files, table={} family={}", toArchive.size(), + htd.getTableName().getNameAsString(), family); + } + + LOG.info("Cleaning obsolete MOB files finished for table={}", htd.getTableName()); + + } + } catch (IOException e) { + LOG.error("MOB Cleaner failed when trying to access the file system", e); + } + } + + /** + * Archives the mob files. + * @param conf The current configuration. + * @param tableName The table name. + * @param family The name of the column family. + * @param storeFiles The files to be archived. + * @throws IOException exception + */ + public void archiveMobFiles(Configuration conf, TableName tableName, byte[] family, + List storeFiles) throws IOException { + + if (storeFiles.size() == 0) { + // nothing to remove + LOG.debug("Skipping archiving old MOB files - no files found for table={} cf={}", tableName, + Bytes.toString(family)); + return; + } + Path mobTableDir = CommonFSUtils.getTableDir(MobUtils.getMobHome(conf), tableName); + FileSystem fs = storeFiles.get(0).getFileSystem(conf); + + for (Path p : storeFiles) { + LOG.debug("MOB Cleaner is archiving: {}", p); + HFileArchiver.archiveStoreFile(conf, fs, MobUtils.getMobRegionInfo(tableName), mobTableDir, + family, p); + } + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java index 931357a9fc5..13b7cc022bb 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java @@ -28,6 +28,7 @@ import java.util.Optional; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Consumer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -184,7 +185,27 @@ public class HMobStore extends HStore { } Path path = getTempDir(); return createWriterInTmp(MobUtils.formatDate(date), path, maxKeyCount, compression, startKey, - isCompaction); + isCompaction, null); + } + + /** + * Creates the writer for the mob file in the mob family directory. + * @param date The latest date of written cells. + * @param maxKeyCount The key count. + * @param compression The compression algorithm. + * @param startKey The start key. + * @param isCompaction If the writer is used in compaction. + * @return The writer for the mob file. n + */ + public StoreFileWriter createWriter(Date date, long maxKeyCount, + Compression.Algorithm compression, byte[] startKey, boolean isCompaction, + Consumer writerCreationTracker) throws IOException { + if (startKey == null) { + startKey = HConstants.EMPTY_START_ROW; + } + Path path = getPath(); + return createWriterInTmp(MobUtils.formatDate(date), path, maxKeyCount, compression, startKey, + isCompaction, writerCreationTracker); } /** @@ -198,11 +219,13 @@ public class HMobStore extends HStore { * @return The writer for the mob file. n */ public StoreFileWriter createWriterInTmp(String date, Path basePath, long maxKeyCount, - Compression.Algorithm compression, byte[] startKey, boolean isCompaction) throws IOException { + Compression.Algorithm compression, byte[] startKey, boolean isCompaction, + Consumer writerCreationTracker) throws IOException { MobFileName mobFileName = MobFileName.create(startKey, date, UUID.randomUUID().toString().replaceAll("-", ""), getHRegion().getRegionInfo().getEncodedName()); - return createWriterInTmp(mobFileName, basePath, maxKeyCount, compression, isCompaction); + return createWriterInTmp(mobFileName, basePath, maxKeyCount, compression, isCompaction, + writerCreationTracker); } /** @@ -214,13 +237,15 @@ public class HMobStore extends HStore { * @param isCompaction If the writer is used in compaction. * @return The writer for the mob file. n */ + public StoreFileWriter createWriterInTmp(MobFileName mobFileName, Path basePath, long maxKeyCount, - Compression.Algorithm compression, boolean isCompaction) throws IOException { + Compression.Algorithm compression, boolean isCompaction, Consumer writerCreationTracker) + throws IOException { return MobUtils.createWriter(conf, getFileSystem(), getColumnFamilyDescriptor(), new Path(basePath, mobFileName.getFileName()), maxKeyCount, compression, getCacheConfig(), getStoreContext().getEncryptionContext(), StoreUtils.getChecksumType(conf), StoreUtils.getBytesPerChecksum(conf), getStoreContext().getBlockSize(), BloomType.NONE, - isCompaction); + isCompaction, writerCreationTracker); } /** @@ -234,6 +259,10 @@ public class HMobStore extends HStore { } Path dstPath = new Path(targetPath, sourceFile.getName()); validateMobFile(sourceFile); + if (sourceFile.equals(targetPath)) { + LOG.info("File is already in the destination dir: {}", sourceFile); + return; + } LOG.info(" FLUSH Renaming flushed file from {} to {}", sourceFile, dstPath); Path parent = dstPath.getParent(); if (!getFileSystem().exists(parent)) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 1865929dc5e..e79f4bec612 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -108,6 +108,7 @@ import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException; import org.apache.hadoop.hbase.ipc.ServerRpcController; import org.apache.hadoop.hbase.log.HBaseMarkers; import org.apache.hadoop.hbase.mob.MobFileCache; +import org.apache.hadoop.hbase.mob.RSMobFileCleanerChore; import org.apache.hadoop.hbase.monitoring.TaskMonitor; import org.apache.hadoop.hbase.namequeues.NamedQueueRecorder; import org.apache.hadoop.hbase.namequeues.SlowLogTableOpsChore; @@ -438,6 +439,8 @@ public class HRegionServer extends HBaseServerBase private BrokenStoreFileCleaner brokenStoreFileCleaner; + private RSMobFileCleanerChore rsMobFileCleanerChore; + @InterfaceAudience.Private CompactedHFilesDischarger compactedFileDischarger; @@ -1898,6 +1901,10 @@ public class HRegionServer extends HBaseServerBase choreService.scheduleChore(brokenStoreFileCleaner); } + if (this.rsMobFileCleanerChore != null) { + choreService.scheduleChore(rsMobFileCleanerChore); + } + // Leases is not a Thread. Internally it runs a daemon thread. If it gets // an unhandled exception, it will just exit. Threads.setDaemonThreadRunning(this.leaseManager, getName() + ".leaseChecker", @@ -1994,6 +2001,8 @@ public class HRegionServer extends HBaseServerBase new BrokenStoreFileCleaner((int) (brokenStoreFileCleanerDelay + jitterValue), brokenStoreFileCleanerPeriod, this, conf, this); + this.rsMobFileCleanerChore = new RSMobFileCleanerChore(this); + registerConfigurationObservers(); } @@ -3550,6 +3559,11 @@ public class HRegionServer extends HBaseServerBase return brokenStoreFileCleaner; } + @InterfaceAudience.Private + public RSMobFileCleanerChore getRSMobFileCleanerChore() { + return rsMobFileCleanerChore; + } + RSSnapshotVerifier getRsSnapshotVerifier() { return rsSnapshotVerifier; } @@ -3566,6 +3580,7 @@ public class HRegionServer extends HBaseServerBase shutdownChore(fsUtilizationChore); shutdownChore(slowLogTableOpsChore); shutdownChore(brokenStoreFileCleaner); + shutdownChore(rsMobFileCleanerChore); } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java index 020009c7f5c..33c2910fcea 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java @@ -2432,7 +2432,7 @@ public class HStore * {@link BrokenStoreFileCleaner} to prevent deleting the these files as they are not present in * SFT yet. */ - Set getStoreFilesBeingWritten() { + public Set getStoreFilesBeingWritten() { return storeFileWriterCreationTrackers.stream().flatMap(t -> t.get().stream()) .collect(Collectors.toSet()); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java index e66a3e05a42..bd9ce6035ad 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java @@ -362,7 +362,7 @@ public abstract class Compactor { writer = sinkFactory.createWriter(scanner, fd, dropCache, request.isMajor(), request.getWriterCreationTracker()); finished = performCompaction(fd, scanner, writer, smallestReadPoint, cleanSeqId, - throughputController, request.isAllFiles(), request.getFiles().size(), progress); + throughputController, request, progress); if (!finished) { throw new InterruptedIOException("Aborting compaction of store " + store + " in region " + store.getRegionInfo().getRegionNameAsString() + " because it was interrupted."); @@ -401,20 +401,19 @@ public abstract class Compactor { /** * Performs the compaction. - * @param fd FileDetails of cell sink writer - * @param scanner Where to read from. - * @param writer Where to write to. - * @param smallestReadPoint Smallest read point. - * @param cleanSeqId When true, remove seqId(used to be mvcc) value which is <= - * smallestReadPoint - * @param major Is a major compaction. - * @param numofFilesToCompact the number of files to compact - * @param progress Progress reporter. + * @param fd FileDetails of cell sink writer + * @param scanner Where to read from. + * @param writer Where to write to. + * @param smallestReadPoint Smallest read point. + * @param cleanSeqId When true, remove seqId(used to be mvcc) value which is <= + * smallestReadPoint + * @param request compaction request. + * @param progress Progress reporter. * @return Whether compaction ended; false if it was interrupted for some reason. */ protected boolean performCompaction(FileDetails fd, InternalScanner scanner, CellSink writer, long smallestReadPoint, boolean cleanSeqId, ThroughputController throughputController, - boolean major, int numofFilesToCompact, CompactionProgress progress) throws IOException { + CompactionRequestImpl request, CompactionProgress progress) throws IOException { assert writer instanceof ShipperListener; long bytesWrittenProgressForLog = 0; long bytesWrittenProgressForShippedCall = 0; @@ -436,7 +435,7 @@ public abstract class Compactor { throughputController.start(compactionName); KeyValueScanner kvs = (scanner instanceof KeyValueScanner) ? (KeyValueScanner) scanner : null; long shippedCallSizeLimit = - (long) numofFilesToCompact * this.store.getColumnFamilyDescriptor().getBlocksize(); + (long) request.getFiles().size() * this.store.getColumnFamilyDescriptor().getBlocksize(); try { do { hasMore = scanner.next(cells, scannerContext); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/FaultyMobStoreCompactor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/FaultyMobStoreCompactor.java index af45263a17c..96675fb69e5 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/FaultyMobStoreCompactor.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/FaultyMobStoreCompactor.java @@ -44,6 +44,7 @@ import org.apache.hadoop.hbase.regionserver.ShipperListener; import org.apache.hadoop.hbase.regionserver.StoreFileWriter; import org.apache.hadoop.hbase.regionserver.compactions.CloseChecker; import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress; +import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequestImpl; import org.apache.hadoop.hbase.regionserver.throttle.ThroughputControlUtil; import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; import org.apache.hadoop.hbase.util.Bytes; @@ -92,8 +93,9 @@ public class FaultyMobStoreCompactor extends DefaultMobStoreCompactor { @Override protected boolean performCompaction(FileDetails fd, InternalScanner scanner, CellSink writer, long smallestReadPoint, boolean cleanSeqId, ThroughputController throughputController, - boolean major, int numofFilesToCompact, CompactionProgress progress) throws IOException { + CompactionRequestImpl request, CompactionProgress progress) throws IOException { + boolean major = request.isAllFiles(); totalCompactions.incrementAndGet(); if (major) { totalMajorCompactions.incrementAndGet(); @@ -145,7 +147,7 @@ public class FaultyMobStoreCompactor extends DefaultMobStoreCompactor { throughputController.start(compactionName); KeyValueScanner kvs = (scanner instanceof KeyValueScanner) ? (KeyValueScanner) scanner : null; long shippedCallSizeLimit = - (long) numofFilesToCompact * this.store.getColumnFamilyDescriptor().getBlocksize(); + (long) request.getFiles().size() * this.store.getColumnFamilyDescriptor().getBlocksize(); Cell mobCell = null; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/MobTestUtil.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/MobTestUtil.java index 2753ba33af4..9b51c22db19 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/MobTestUtil.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/MobTestUtil.java @@ -18,17 +18,26 @@ package org.apache.hadoop.hbase.mob; import java.io.IOException; +import java.util.Date; import java.util.List; import java.util.Random; import java.util.concurrent.ThreadLocalRandom; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HBaseTestingUtil; import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.io.compress.Compression; +import org.apache.hadoop.hbase.io.crypto.Encryption; +import org.apache.hadoop.hbase.io.hfile.CacheConfig; import org.apache.hadoop.hbase.regionserver.StoreFileWriter; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; @@ -112,4 +121,17 @@ public class MobTestUtil { scan.setAttribute(MobConstants.MOB_SCAN_RAW, Bytes.toBytes(Boolean.TRUE)); return HBaseTestingUtil.countRows(table, scan); } + + public static Path generateMOBFileForRegion(Configuration conf, TableName tableName, + ColumnFamilyDescriptor familyDescriptor, String regionName) throws IOException { + Date date = new Date(); + String dateStr = MobUtils.formatDate(date); + FileSystem fs = FileSystem.get(conf); + Path cfMOBDir = MobUtils.getMobFamilyPath(conf, tableName, familyDescriptor.getNameAsString()); + StoreFileWriter writer = MobUtils.createWriter(conf, fs, familyDescriptor, dateStr, cfMOBDir, + 1000L, Compression.Algorithm.NONE, "startKey", CacheConfig.DISABLED, Encryption.Context.NONE, + false, ""); + writer.close(); + return writer.getPath(); + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestDefaultMobStoreFlusher.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestDefaultMobStoreFlusher.java index 70e7bbf4cb4..6b7ec952d05 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestDefaultMobStoreFlusher.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestDefaultMobStoreFlusher.java @@ -17,6 +17,8 @@ */ package org.apache.hadoop.hbase.mob; +import java.util.Arrays; +import java.util.Collection; import java.util.List; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; @@ -31,17 +33,21 @@ import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.client.TableDescriptor; import org.apache.hadoop.hbase.client.TableDescriptorBuilder; +import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory; import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.util.Bytes; -import org.junit.AfterClass; +import org.junit.After; import org.junit.Assert; -import org.junit.BeforeClass; +import org.junit.Before; import org.junit.ClassRule; import org.junit.Rule; import org.junit.Test; import org.junit.experimental.categories.Category; import org.junit.rules.TestName; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +@RunWith(Parameterized.class) @Category(LargeTests.class) public class TestDefaultMobStoreFlusher { @@ -61,19 +67,35 @@ public class TestDefaultMobStoreFlusher { @Rule public TestName name = new TestName(); - @BeforeClass - public static void setUpBeforeClass() throws Exception { + protected Boolean useFileBasedSFT; + + public TestDefaultMobStoreFlusher(Boolean useFileBasedSFT) { + this.useFileBasedSFT = useFileBasedSFT; + } + + @Parameterized.Parameters + public static Collection data() { + Boolean[] data = { false, true }; + return Arrays.asList(data); + } + + @Before + public void setUpBefore() throws Exception { + if (useFileBasedSFT) { + TEST_UTIL.getConfiguration().set(StoreFileTrackerFactory.TRACKER_IMPL, + "org.apache.hadoop.hbase.regionserver.storefiletracker.FileBasedStoreFileTracker"); + } TEST_UTIL.startMiniCluster(1); } - @AfterClass - public static void tearDownAfterClass() throws Exception { + @After + public void tearDownAfter() throws Exception { TEST_UTIL.shutdownMiniCluster(); } @Test public void testFlushNonMobFile() throws Exception { - final TableName tableName = TableName.valueOf(name.getMethodName()); + final TableName tableName = TableName.valueOf(TestMobUtils.getTableName(name)); TableDescriptor tableDescriptor = TableDescriptorBuilder.newBuilder(tableName) .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(4).build()) .build(); @@ -82,7 +104,7 @@ public class TestDefaultMobStoreFlusher { @Test public void testFlushMobFile() throws Exception { - final TableName tableName = TableName.valueOf(name.getMethodName()); + final TableName tableName = TableName.valueOf(TestMobUtils.getTableName(name)); TableDescriptor tableDescriptor = TableDescriptorBuilder.newBuilder(tableName) .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(family).setMobEnabled(true) .setMobThreshold(3L).setMaxVersions(4).build()) diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobCompactionOptMode.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobCompactionOptMode.java index 5f3cc629959..1c586bbd10c 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobCompactionOptMode.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobCompactionOptMode.java @@ -17,10 +17,8 @@ */ package org.apache.hadoop.hbase.mob; -import java.io.IOException; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.testclassification.LargeTests; -import org.junit.BeforeClass; import org.junit.ClassRule; import org.junit.experimental.categories.Category; @@ -41,12 +39,13 @@ public class TestMobCompactionOptMode extends TestMobCompactionWithDefaults { public static final HBaseClassTestRule CLASS_RULE = HBaseClassTestRule.forClass(TestMobCompactionOptMode.class); - @BeforeClass - public static void configureOptimizedCompaction() throws InterruptedException, IOException { - HTU.shutdownMiniHBaseCluster(); + public TestMobCompactionOptMode(Boolean useFileBasedSFT) { + super(useFileBasedSFT); + } + + protected void additonalConfigSetup() { conf.set(MobConstants.MOB_COMPACTION_TYPE_KEY, MobConstants.OPTIMIZED_MOB_COMPACTION_TYPE); conf.setLong(MobConstants.MOB_COMPACTION_MAX_FILE_SIZE_KEY, 1000000); - HTU.startMiniHBaseCluster(); } @Override diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobCompactionOptRegionBatchMode.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobCompactionOptRegionBatchMode.java index 7b6b44d0e31..dfd6435d364 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobCompactionOptRegionBatchMode.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobCompactionOptRegionBatchMode.java @@ -23,9 +23,10 @@ import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; import org.apache.hadoop.hbase.client.TableDescriptor; import org.apache.hadoop.hbase.testclassification.LargeTests; import org.junit.Before; -import org.junit.BeforeClass; import org.junit.ClassRule; import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -39,6 +40,7 @@ import org.slf4j.LoggerFactory; * time larger than minimum age to archive 10. Runs Mob cleaner chore 11 Verifies that number of MOB * files in a mob directory is 20. 12 Runs scanner and checks all 3 * 1000 rows. */ +@RunWith(Parameterized.class) @Category(LargeTests.class) public class TestMobCompactionOptRegionBatchMode extends TestMobCompactionWithDefaults { private static final Logger LOG = @@ -50,20 +52,20 @@ public class TestMobCompactionOptRegionBatchMode extends TestMobCompactionWithDe private static final int batchSize = 7; private MobFileCompactionChore compactionChore; + public TestMobCompactionOptRegionBatchMode(Boolean useFileBasedSFT) { + super(useFileBasedSFT); + } + @Before public void setUp() throws Exception { super.setUp(); compactionChore = new MobFileCompactionChore(conf, batchSize); } - @BeforeClass - public static void configureOptimizedCompactionAndBatches() - throws InterruptedException, IOException { - HTU.shutdownMiniHBaseCluster(); + protected void additonalConfigSetup() { conf.setInt(MobConstants.MOB_MAJOR_COMPACTION_REGION_BATCH_SIZE, batchSize); conf.set(MobConstants.MOB_COMPACTION_TYPE_KEY, MobConstants.OPTIMIZED_MOB_COMPACTION_TYPE); conf.setLong(MobConstants.MOB_COMPACTION_MAX_FILE_SIZE_KEY, 1000000); - HTU.startMiniHBaseCluster(); } @Override diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobCompactionRegularRegionBatchMode.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobCompactionRegularRegionBatchMode.java index 3d6eaa0a25a..5e2806bdee2 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobCompactionRegularRegionBatchMode.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobCompactionRegularRegionBatchMode.java @@ -23,9 +23,10 @@ import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; import org.apache.hadoop.hbase.client.TableDescriptor; import org.apache.hadoop.hbase.testclassification.LargeTests; import org.junit.Before; -import org.junit.BeforeClass; import org.junit.ClassRule; import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -39,6 +40,7 @@ import org.slf4j.LoggerFactory; * to archive 10. Runs Mob cleaner chore 11 Verifies that number of MOB files in a mob directory is * 20. 12 Runs scanner and checks all 3 * 1000 rows. */ +@RunWith(Parameterized.class) @Category(LargeTests.class) public class TestMobCompactionRegularRegionBatchMode extends TestMobCompactionWithDefaults { private static final Logger LOG = @@ -50,17 +52,18 @@ public class TestMobCompactionRegularRegionBatchMode extends TestMobCompactionWi private static final int batchSize = 7; private MobFileCompactionChore compactionChore; + public TestMobCompactionRegularRegionBatchMode(Boolean useFileBasedSFT) { + super(useFileBasedSFT); + } + @Before public void setUp() throws Exception { super.setUp(); compactionChore = new MobFileCompactionChore(conf, batchSize); } - @BeforeClass - public static void configureCompactionBatches() throws InterruptedException, IOException { - HTU.shutdownMiniHBaseCluster(); + protected void additonalConfigSetup() { conf.setInt(MobConstants.MOB_MAJOR_COMPACTION_REGION_BATCH_SIZE, batchSize); - HTU.startMiniHBaseCluster(); } @Override diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobCompactionWithDefaults.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobCompactionWithDefaults.java index 4a1b3f1b8b3..69ba4ea24b2 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobCompactionWithDefaults.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobCompactionWithDefaults.java @@ -23,6 +23,7 @@ import static org.junit.Assert.fail; import java.io.IOException; import java.util.Arrays; +import java.util.Collection; import java.util.List; import java.util.stream.Collectors; import org.apache.hadoop.conf.Configuration; @@ -31,6 +32,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseTestingUtil; +import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; @@ -41,18 +43,19 @@ import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.client.TableDescriptor; +import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory; import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.RegionSplitter; import org.junit.After; -import org.junit.AfterClass; import org.junit.Before; -import org.junit.BeforeClass; import org.junit.ClassRule; import org.junit.Rule; import org.junit.Test; import org.junit.experimental.categories.Category; import org.junit.rules.TestName; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -66,6 +69,7 @@ import org.slf4j.LoggerFactory; * Runs Mob cleaner chore 11 Verifies that number of MOB files in a mob directory is 20. 12 Runs * scanner and checks all 3 * 1000 rows. */ +@RunWith(Parameterized.class) @Category(LargeTests.class) public class TestMobCompactionWithDefaults { private static final Logger LOG = LoggerFactory.getLogger(TestMobCompactionWithDefaults.class); @@ -73,7 +77,7 @@ public class TestMobCompactionWithDefaults { public static final HBaseClassTestRule CLASS_RULE = HBaseClassTestRule.forClass(TestMobCompactionWithDefaults.class); - protected static HBaseTestingUtil HTU; + protected HBaseTestingUtil HTU; protected static Configuration conf; protected static long minAgeToArchive = 10000; @@ -95,8 +99,19 @@ public class TestMobCompactionWithDefaults { protected MobFileCleanerChore cleanerChore; - @BeforeClass - public static void htuStart() throws Exception { + protected Boolean useFileBasedSFT; + + public TestMobCompactionWithDefaults(Boolean useFileBasedSFT) { + this.useFileBasedSFT = useFileBasedSFT; + } + + @Parameterized.Parameters + public static Collection data() { + Boolean[] data = { false, true }; + return Arrays.asList(data); + } + + protected void htuStart() throws Exception { HTU = new HBaseTestingUtil(); conf = HTU.getConfiguration(); conf.setInt("hfile.format.version", 3); @@ -109,21 +124,25 @@ public class TestMobCompactionWithDefaults { // Set compacted file discharger interval to a half minAgeToArchive conf.setLong("hbase.hfile.compaction.discharger.interval", minAgeToArchive / 2); conf.setBoolean("hbase.regionserver.compaction.enabled", false); + if (useFileBasedSFT) { + conf.set(StoreFileTrackerFactory.TRACKER_IMPL, + "org.apache.hadoop.hbase.regionserver.storefiletracker.FileBasedStoreFileTracker"); + } + additonalConfigSetup(); HTU.startMiniCluster(); } - @AfterClass - public static void htuStop() throws Exception { - HTU.shutdownMiniCluster(); + protected void additonalConfigSetup() { } @Before public void setUp() throws Exception { + htuStart(); admin = HTU.getAdmin(); cleanerChore = new MobFileCleanerChore(); familyDescriptor = ColumnFamilyDescriptorBuilder.newBuilder(fam).setMobEnabled(true) .setMobThreshold(mobLen).setMaxVersions(1).build(); - tableDescriptor = HTU.createModifyableTableDescriptor(test.getMethodName()) + tableDescriptor = HTU.createModifyableTableDescriptor(TestMobUtils.getTableName(test)) .setColumnFamily(familyDescriptor).build(); RegionSplitter.UniformSplit splitAlgo = new RegionSplitter.UniformSplit(); byte[][] splitKeys = splitAlgo.split(numRegions); @@ -152,6 +171,7 @@ public class TestMobCompactionWithDefaults { public void tearDown() throws Exception { admin.disableTable(tableDescriptor.getTableName()); admin.deleteTable(tableDescriptor.getTableName()); + HTU.shutdownMiniCluster(); } @Test @@ -167,12 +187,12 @@ public class TestMobCompactionWithDefaults { @Test public void testMobFileCompactionAfterSnapshotClone() throws InterruptedException, IOException { - final TableName clone = TableName.valueOf(test.getMethodName() + "-clone"); + final TableName clone = TableName.valueOf(TestMobUtils.getTableName(test) + "-clone"); LOG.info("MOB compaction of cloned snapshot, " + description() + " started"); loadAndFlushThreeTimes(rows, table, famStr); LOG.debug("Taking snapshot and cloning table {}", table); - admin.snapshot(test.getMethodName(), table); - admin.cloneSnapshot(test.getMethodName(), clone); + admin.snapshot(TestMobUtils.getTableName(test), table); + admin.cloneSnapshot(TestMobUtils.getTableName(test), clone); assertEquals("Should have 3 hlinks per region in MOB area from snapshot clone", 3 * numRegions, getNumberOfMobFiles(clone, famStr)); mobCompact(admin.getDescriptor(clone), familyDescriptor); @@ -185,12 +205,12 @@ public class TestMobCompactionWithDefaults { @Test public void testMobFileCompactionAfterSnapshotCloneAndFlush() throws InterruptedException, IOException { - final TableName clone = TableName.valueOf(test.getMethodName() + "-clone"); + final TableName clone = TableName.valueOf(TestMobUtils.getTableName(test) + "-clone"); LOG.info("MOB compaction of cloned snapshot after flush, " + description() + " started"); loadAndFlushThreeTimes(rows, table, famStr); LOG.debug("Taking snapshot and cloning table {}", table); - admin.snapshot(test.getMethodName(), table); - admin.cloneSnapshot(test.getMethodName(), clone); + admin.snapshot(TestMobUtils.getTableName(test), table); + admin.cloneSnapshot(TestMobUtils.getTableName(test), clone); assertEquals("Should have 3 hlinks per region in MOB area from snapshot clone", 3 * numRegions, getNumberOfMobFiles(clone, famStr)); loadAndFlushThreeTimes(rows, clone, famStr); @@ -269,8 +289,11 @@ public class TestMobCompactionWithDefaults { Thread.sleep(minAgeToArchive + 1000); LOG.info("Cleaning up MOB files"); - // Cleanup again - cleanerChore.cleanupObsoleteMobFiles(conf, table); + + // run cleaner chore on each RS + for (ServerName sn : admin.getRegionServers()) { + HTU.getMiniHBaseCluster().getRegionServer(sn).getRSMobFileCleanerChore().chore(); + } assertEquals("After cleaning, we should have 1 MOB file per region based on size.", numRegions, getNumberOfMobFiles(table, family)); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobFileCleanerChore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobFileCleanerChore.java index f950c18dd18..bdc3cce13e4 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobFileCleanerChore.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobFileCleanerChore.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hbase.mob; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import java.io.IOException; @@ -166,14 +167,38 @@ public class TestMobFileCleanerChore { Thread.sleep(minAgeToArchive + 1000); LOG.info("Cleaning up MOB files"); - // Cleanup again + // Cleanup chore.cleanupObsoleteMobFiles(conf, table.getName()); + // verify that nothing have happened num = getNumberOfMobFiles(conf, table.getName(), new String(fam)); - assertEquals(1, num); + assertEquals(4, num); long scanned = scanTable(); assertEquals(30, scanned); + + // add a MOB file to with a name refering to a non-existing region + Path extraMOBFile = MobTestUtil.generateMOBFileForRegion(conf, table.getName(), + familyDescriptor, "nonExistentRegion"); + num = getNumberOfMobFiles(conf, table.getName(), new String(fam)); + assertEquals(5, num); + + LOG.info("Waiting for {}ms", minAgeToArchive + 1000); + + Thread.sleep(minAgeToArchive + 1000); + LOG.info("Cleaning up MOB files"); + chore.cleanupObsoleteMobFiles(conf, table.getName()); + + // check that the extra file got deleted + num = getNumberOfMobFiles(conf, table.getName(), new String(fam)); + assertEquals(4, num); + + FileSystem fs = FileSystem.get(conf); + assertFalse(fs.exists(extraMOBFile)); + + scanned = scanTable(); + assertEquals(30, scanned); + } private long getNumberOfMobFiles(Configuration conf, TableName tableName, String family) diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobStoreCompaction.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobStoreCompaction.java index 9951bef4747..c2f2b3fd426 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobStoreCompaction.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobStoreCompaction.java @@ -26,11 +26,14 @@ import static org.junit.Assert.assertTrue; import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Set; +import java.util.UUID; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; @@ -63,6 +66,7 @@ import org.apache.hadoop.hbase.regionserver.InternalScanner; import org.apache.hadoop.hbase.regionserver.RegionAsTable; import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker; +import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory; import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.testclassification.MediumTests; @@ -76,12 +80,15 @@ import org.junit.Rule; import org.junit.Test; import org.junit.experimental.categories.Category; import org.junit.rules.TestName; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * Test mob store compaction */ +@RunWith(Parameterized.class) @Category(MediumTests.class) public class TestMobStoreCompaction { @@ -106,15 +113,33 @@ public class TestMobStoreCompaction { private final byte[] STARTROW = Bytes.toBytes(START_KEY); private int compactionThreshold; + private Boolean useFileBasedSFT; + + public TestMobStoreCompaction(Boolean useFileBasedSFT) { + this.useFileBasedSFT = useFileBasedSFT; + } + + @Parameterized.Parameters + public static Collection data() { + Boolean[] data = { false, true }; + return Arrays.asList(data); + } + private void init(Configuration conf, long mobThreshold) throws Exception { + if (useFileBasedSFT) { + conf.set(StoreFileTrackerFactory.TRACKER_IMPL, + "org.apache.hadoop.hbase.regionserver.storefiletracker.FileBasedStoreFileTracker"); + } + this.conf = conf; this.mobCellThreshold = mobThreshold; + HBaseTestingUtil UTIL = new HBaseTestingUtil(conf); compactionThreshold = conf.getInt("hbase.hstore.compactionThreshold", 3); familyDescriptor = ColumnFamilyDescriptorBuilder.newBuilder(COLUMN_FAMILY).setMobEnabled(true) .setMobThreshold(mobThreshold).setMaxVersions(1).build(); - tableDescriptor = UTIL.createModifyableTableDescriptor(name.getMethodName()) + tableDescriptor = UTIL.createModifyableTableDescriptor(TestMobUtils.getTableName(name)) .modifyColumnFamily(familyDescriptor).build(); RegionInfo regionInfo = RegionInfoBuilder.newBuilder(tableDescriptor.getTableName()).build(); @@ -223,7 +248,7 @@ public class TestMobStoreCompaction { Path basedir = new Path(hbaseRootDir, tableDescriptor.getTableName().getNameAsString()); List> hfiles = new ArrayList<>(1); for (int i = 0; i < compactionThreshold; i++) { - Path hpath = new Path(basedir, "hfile" + i); + Path hpath = new Path(basedir, UUID.randomUUID().toString().replace("-", "")); hfiles.add(Pair.newPair(COLUMN_FAMILY, hpath.toString())); createHFile(hpath, i, dummyData); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobUtils.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobUtils.java index 247dfff77d1..32170e98b35 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobUtils.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobUtils.java @@ -27,6 +27,7 @@ import org.apache.hadoop.hbase.util.Bytes; import org.junit.ClassRule; import org.junit.Test; import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableSet; import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableSetMultimap; @@ -89,4 +90,8 @@ public class TestMobUtils { assertTrue(testTable3Refs.contains("file3a")); assertTrue(testTable3Refs.contains("file3b")); } + + public static String getTableName(TestName test) { + return test.getMethodName().replace("[", "-").replace("]", ""); + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestRSMobFileCleanerChore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestRSMobFileCleanerChore.java new file mode 100644 index 00000000000..86cb3558f67 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestRSMobFileCleanerChore.java @@ -0,0 +1,263 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mob; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.stream.Collectors; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseTestingUtil; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; +import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; +import org.apache.hadoop.hbase.client.CompactionState; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.RegionInfo; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.client.TableDescriptor; +import org.apache.hadoop.hbase.master.cleaner.TimeToLiveHFileCleaner; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.After; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Mob file cleaner chore test. 1. Creates MOB table 2. Load MOB data and flushes it N times 3. Runs + * major MOB compaction 4. Verifies that number of MOB files in a mob directory is N+1 5. Waits for + * a period of time larger than minimum age to archive 6. Runs Mob cleaner chore 7 Verifies that + * every old MOB file referenced from current RS was archived + */ +@Category(MediumTests.class) +public class TestRSMobFileCleanerChore { + private static final Logger LOG = LoggerFactory.getLogger(TestRSMobFileCleanerChore.class); + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestRSMobFileCleanerChore.class); + + private HBaseTestingUtil HTU; + + private final static String famStr = "f1"; + private final static byte[] fam = Bytes.toBytes(famStr); + private final static byte[] qualifier = Bytes.toBytes("q1"); + private final static long mobLen = 10; + private final static byte[] mobVal = Bytes + .toBytes("01234567890123456789012345678901234567890123456789012345678901234567890123456789"); + + private Configuration conf; + private TableDescriptor tableDescriptor; + private ColumnFamilyDescriptor familyDescriptor; + private Admin admin; + private Table table = null; + private RSMobFileCleanerChore chore; + private long minAgeToArchive = 10000; + + public TestRSMobFileCleanerChore() { + } + + @Before + public void setUp() throws Exception { + HTU = new HBaseTestingUtil(); + conf = HTU.getConfiguration(); + + initConf(); + + HTU.startMiniCluster(); + admin = HTU.getAdmin(); + familyDescriptor = ColumnFamilyDescriptorBuilder.newBuilder(fam).setMobEnabled(true) + .setMobThreshold(mobLen).setMaxVersions(1).build(); + tableDescriptor = HTU.createModifyableTableDescriptor("testMobCompactTable") + .setColumnFamily(familyDescriptor).build(); + table = HTU.createTable(tableDescriptor, Bytes.toByteArrays("1")); + } + + private void initConf() { + + conf.setInt("hfile.format.version", 3); + conf.setLong(TimeToLiveHFileCleaner.TTL_CONF_KEY, 0); + conf.setInt("hbase.client.retries.number", 100); + conf.setInt("hbase.hregion.max.filesize", 200000000); + conf.setInt("hbase.hregion.memstore.flush.size", 800000); + conf.setInt("hbase.hstore.blockingStoreFiles", 150); + conf.setInt("hbase.hstore.compaction.throughput.lower.bound", 52428800); + conf.setInt("hbase.hstore.compaction.throughput.higher.bound", 2 * 52428800); + // conf.set(MobStoreEngine.DEFAULT_MOB_COMPACTOR_CLASS_KEY, + // FaultyMobStoreCompactor.class.getName()); + // Disable automatic MOB compaction + conf.setLong(MobConstants.MOB_COMPACTION_CHORE_PERIOD, 0); + // Disable automatic MOB file cleaner chore + conf.setLong(MobConstants.MOB_CLEANER_PERIOD, 0); + // Set minimum age to archive to 10 sec + conf.setLong(MobConstants.MIN_AGE_TO_ARCHIVE_KEY, minAgeToArchive); + // Set compacted file discharger interval to a half minAgeToArchive + conf.setLong("hbase.hfile.compaction.discharger.interval", minAgeToArchive / 2); + } + + private void loadData(int start, int num) { + try { + + for (int i = 0; i < num; i++) { + Put p = new Put(Bytes.toBytes(start + i)); + p.addColumn(fam, qualifier, mobVal); + table.put(p); + } + admin.flush(table.getName()); + } catch (Exception e) { + LOG.error("MOB file cleaner chore test FAILED", e); + assertTrue(false); + } + } + + @After + public void tearDown() throws Exception { + admin.disableTable(tableDescriptor.getTableName()); + admin.deleteTable(tableDescriptor.getTableName()); + HTU.shutdownMiniCluster(); + } + + @Test + public void testMobFileCleanerChore() throws InterruptedException, IOException { + loadData(0, 10); + loadData(10, 10); + // loadData(20, 10); + long num = getNumberOfMobFiles(conf, table.getName(), new String(fam)); + assertEquals(2, num); + // Major compact + admin.majorCompact(tableDescriptor.getTableName(), fam); + // wait until compaction is complete + while (admin.getCompactionState(tableDescriptor.getTableName()) != CompactionState.NONE) { + Thread.sleep(100); + } + + num = getNumberOfMobFiles(conf, table.getName(), new String(fam)); + assertEquals(3, num); + // We have guarantee, that compcated file discharger will run during this pause + // because it has interval less than this wait time + LOG.info("Waiting for {}ms", minAgeToArchive + 1000); + + Thread.sleep(minAgeToArchive + 1000); + LOG.info("Cleaning up MOB files"); + + ServerName serverUsed = null; + List serverRegions = null; + for (ServerName sn : admin.getRegionServers()) { + serverRegions = admin.getRegions(sn); + if (serverRegions != null && serverRegions.size() > 0) { + // filtering out non test table regions + serverRegions = serverRegions.stream().filter(r -> r.getTable() == table.getName()) + .collect(Collectors.toList()); + // if such one is found use this rs + if (serverRegions.size() > 0) { + serverUsed = sn; + } + break; + } + } + + chore = HTU.getMiniHBaseCluster().getRegionServer(serverUsed).getRSMobFileCleanerChore(); + + chore.chore(); + + num = getNumberOfMobFiles(conf, table.getName(), new String(fam)); + assertEquals(3 - serverRegions.size(), num); + + long scanned = scanTable(); + assertEquals(20, scanned); + + // creating a MOB file not referenced from the current RS + Path extraMOBFile = MobTestUtil.generateMOBFileForRegion(conf, table.getName(), + familyDescriptor, "nonExistentRegion"); + + // verifying the new MOBfile is added + num = getNumberOfMobFiles(conf, table.getName(), new String(fam)); + assertEquals(4 - serverRegions.size(), num); + + FileSystem fs = FileSystem.get(conf); + assertTrue(fs.exists(extraMOBFile)); + + LOG.info("Waiting for {}ms", minAgeToArchive + 1000); + + Thread.sleep(minAgeToArchive + 1000); + LOG.info("Cleaning up MOB files"); + + // running chore again + chore.chore(); + + // the chore should only archive old MOB files that were referenced from the current RS + // the unrelated MOB file is still there + num = getNumberOfMobFiles(conf, table.getName(), new String(fam)); + assertEquals(4 - serverRegions.size(), num); + + assertTrue(fs.exists(extraMOBFile)); + + scanned = scanTable(); + assertEquals(20, scanned); + } + + private long getNumberOfMobFiles(Configuration conf, TableName tableName, String family) + throws IOException { + FileSystem fs = FileSystem.get(conf); + Path dir = MobUtils.getMobFamilyPath(conf, tableName, family); + FileStatus[] stat = fs.listStatus(dir); + for (FileStatus st : stat) { + LOG.debug("DDDD MOB Directory content: {} size={}", st.getPath(), st.getLen()); + } + LOG.debug("MOB Directory content total files: {}", stat.length); + + return stat.length; + } + + private long scanTable() { + try { + + Result result; + ResultScanner scanner = table.getScanner(fam); + long counter = 0; + while ((result = scanner.next()) != null) { + assertTrue(Arrays.equals(result.getValue(fam, qualifier), mobVal)); + counter++; + } + return counter; + } catch (Exception e) { + e.printStackTrace(); + LOG.error("MOB file cleaner chore test FAILED"); + if (HTU != null) { + assertTrue(false); + } else { + System.exit(-1); + } + } + return 0; + } +}