diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreCompactor.java index e93d87237cc..e46eb0f6209 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreCompactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreCompactor.java @@ -294,20 +294,20 @@ public class DefaultMobStoreCompactor extends DefaultCompactor { * @param cleanSeqId When true, remove seqId(used to be mvcc) value which is <= * smallestReadPoint * @param throughputController The compaction throughput controller. - * @param major Is a major compaction. - * @param numofFilesToCompact the number of files to compact + * @param request compaction request. * @param progress Progress reporter. * @return Whether compaction ended; false if it was interrupted for any reason. */ @Override protected boolean performCompaction(FileDetails fd, InternalScanner scanner, CellSink writer, long smallestReadPoint, boolean cleanSeqId, ThroughputController throughputController, - boolean major, int numofFilesToCompact, CompactionProgress progress) throws IOException { + CompactionRequestImpl request, CompactionProgress progress) throws IOException { long bytesWrittenProgressForLog = 0; long bytesWrittenProgressForShippedCall = 0; // Clear old mob references mobRefSet.get().clear(); boolean isUserRequest = userRequest.get(); + boolean major = request.isAllFiles(); boolean compactMOBs = major && isUserRequest; boolean discardMobMiss = conf.getBoolean(MobConstants.MOB_UNSAFE_DISCARD_MISS_KEY, MobConstants.DEFAULT_MOB_DISCARD_MISS); @@ -351,12 +351,12 @@ public class DefaultMobStoreCompactor extends DefaultCompactor { throughputController.start(compactionName); KeyValueScanner kvs = (scanner instanceof KeyValueScanner) ? (KeyValueScanner) scanner : null; long shippedCallSizeLimit = - (long) numofFilesToCompact * this.store.getColumnFamilyDescriptor().getBlocksize(); + (long) request.getFiles().size() * this.store.getColumnFamilyDescriptor().getBlocksize(); Cell mobCell = null; try { - mobFileWriter = newMobWriter(fd, major); + mobFileWriter = newMobWriter(fd, major, request.getWriterCreationTracker()); fileName = Bytes.toBytes(mobFileWriter.getPath().getName()); do { @@ -426,7 +426,7 @@ public class DefaultMobStoreCompactor extends DefaultCompactor { LOG.debug("Closing output MOB File, length={} file={}, store={}", len, mobFileWriter.getPath().getName(), getStoreInfo()); commitOrAbortMobWriter(mobFileWriter, fd.maxSeqId, mobCells, major); - mobFileWriter = newMobWriter(fd, major); + mobFileWriter = newMobWriter(fd, major, request.getWriterCreationTracker()); fileName = Bytes.toBytes(mobFileWriter.getPath().getName()); mobCells = 0; } @@ -470,7 +470,7 @@ public class DefaultMobStoreCompactor extends DefaultCompactor { long len = mobFileWriter.getPos(); if (len > maxMobFileSize) { commitOrAbortMobWriter(mobFileWriter, fd.maxSeqId, mobCells, major); - mobFileWriter = newMobWriter(fd, major); + mobFileWriter = newMobWriter(fd, major, request.getWriterCreationTracker()); fileName = Bytes.toBytes(mobFileWriter.getPath().getName()); mobCells = 0; } @@ -522,7 +522,7 @@ public class DefaultMobStoreCompactor extends DefaultCompactor { long len = mobFileWriter.getPos(); if (len > maxMobFileSize) { commitOrAbortMobWriter(mobFileWriter, fd.maxSeqId, mobCells, major); - mobFileWriter = newMobWriter(fd, major); + mobFileWriter = newMobWriter(fd, major, request.getWriterCreationTracker()); fileName = Bytes.toBytes(mobFileWriter.getPath().getName()); mobCells = 0; } @@ -608,11 +608,16 @@ public class DefaultMobStoreCompactor extends DefaultCompactor { } } - private StoreFileWriter newMobWriter(FileDetails fd, boolean major) throws IOException { + private StoreFileWriter newMobWriter(FileDetails fd, boolean major, + Consumer writerCreationTracker) throws IOException { try { - StoreFileWriter mobFileWriter = mobStore.createWriterInTmp(new Date(fd.latestPutTs), - fd.maxKeyCount, major ? majorCompactionCompression : minorCompactionCompression, - store.getRegionInfo().getStartKey(), true); + StoreFileWriter mobFileWriter = mobStore.getStoreEngine().requireWritingToTmpDirFirst() + ? mobStore.createWriterInTmp(new Date(fd.latestPutTs), fd.maxKeyCount, + major ? majorCompactionCompression : minorCompactionCompression, + store.getRegionInfo().getStartKey(), true) + : mobStore.createWriter(new Date(fd.latestPutTs), fd.maxKeyCount, + major ? majorCompactionCompression : minorCompactionCompression, + store.getRegionInfo().getStartKey(), true, writerCreationTracker); LOG.debug("New MOB writer created={} store={}", mobFileWriter.getPath().getName(), getStoreInfo()); // Add reference we get for compact MOB diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreFlusher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreFlusher.java index a7f2ecdf242..6df17e58e22 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreFlusher.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreFlusher.java @@ -127,7 +127,8 @@ public class DefaultMobStoreFlusher extends DefaultStoreFlusher { try { // It's a mob store, flush the cells in a mob way. This is the difference of flushing // between a normal and a mob store. - performMobFlush(snapshot, cacheFlushId, scanner, writer, status, throughputController); + performMobFlush(snapshot, cacheFlushId, scanner, writer, status, throughputController, + writerCreationTracker); } catch (IOException ioe) { e = ioe; // throw the exception out @@ -171,16 +172,21 @@ public class DefaultMobStoreFlusher extends DefaultStoreFlusher { */ protected void performMobFlush(MemStoreSnapshot snapshot, long cacheFlushId, InternalScanner scanner, StoreFileWriter writer, MonitoredTask status, - ThroughputController throughputController) throws IOException { + ThroughputController throughputController, Consumer writerCreationTracker) + throws IOException { StoreFileWriter mobFileWriter = null; int compactionKVMax = conf.getInt(HConstants.COMPACTION_KV_MAX, HConstants.COMPACTION_KV_MAX_DEFAULT); long mobCount = 0; long mobSize = 0; long time = snapshot.getTimeRangeTracker().getMax(); - mobFileWriter = mobStore.createWriterInTmp(new Date(time), snapshot.getCellsCount(), - store.getColumnFamilyDescriptor().getCompressionType(), store.getRegionInfo().getStartKey(), - false); + mobFileWriter = mobStore.getStoreEngine().requireWritingToTmpDirFirst() + ? mobStore.createWriterInTmp(new Date(time), snapshot.getCellsCount(), + store.getColumnFamilyDescriptor().getCompressionType(), store.getRegionInfo().getStartKey(), + false) + : mobStore.createWriter(new Date(time), snapshot.getCellsCount(), + store.getColumnFamilyDescriptor().getCompressionType(), store.getRegionInfo().getStartKey(), + false, writerCreationTracker); // the target path is {tableName}/.mob/{cfName}/mobFiles // the relative path is mobFiles byte[] fileName = Bytes.toBytes(mobFileWriter.getPath().getName()); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobFileCleanerChore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobFileCleanerChore.java index e2c1f8961de..eb2201417b0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobFileCleanerChore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobFileCleanerChore.java @@ -168,14 +168,15 @@ public class MobFileCleanerChore extends ScheduledChore { maxCreationTimeToArchive, table); } + FileSystem fs = FileSystem.get(conf); + Set regionNames = new HashSet<>(); Path rootDir = CommonFSUtils.getRootDir(conf); Path tableDir = CommonFSUtils.getTableDir(rootDir, table); - // How safe is this call? - List regionDirs = FSUtils.getRegionDirs(FileSystem.get(conf), tableDir); + List regionDirs = FSUtils.getRegionDirs(fs, tableDir); Set allActiveMobFileName = new HashSet(); - FileSystem fs = FileSystem.get(conf); for (Path regionPath : regionDirs) { + regionNames.add(regionPath.getName()); for (ColumnFamilyDescriptor hcd : list) { String family = hcd.getNameAsString(); Path storePath = new Path(regionPath, family); @@ -203,13 +204,26 @@ public class MobFileCleanerChore extends ScheduledChore { for (Path pp : storeFiles) { currentPath = pp; LOG.trace("Store file: {}", pp); - HStoreFile sf = - new HStoreFile(fs, pp, conf, CacheConfig.DISABLED, BloomType.NONE, true); - sf.initReader(); - byte[] mobRefData = sf.getMetadataValue(HStoreFile.MOB_FILE_REFS); - byte[] bulkloadMarkerData = sf.getMetadataValue(HStoreFile.BULKLOAD_TASK_KEY); - // close store file to avoid memory leaks - sf.closeStoreFile(true); + HStoreFile sf = null; + byte[] mobRefData = null; + byte[] bulkloadMarkerData = null; + try { + sf = new HStoreFile(fs, pp, conf, CacheConfig.DISABLED, BloomType.NONE, true); + sf.initReader(); + mobRefData = sf.getMetadataValue(HStoreFile.MOB_FILE_REFS); + bulkloadMarkerData = sf.getMetadataValue(HStoreFile.BULKLOAD_TASK_KEY); + // close store file to avoid memory leaks + sf.closeStoreFile(true); + } catch (IOException ex) { + // When FileBased SFT is active the store dir can contain corrupted or incomplete + // files. So read errors are expected. We just skip these files. + if (ex instanceof FileNotFoundException) { + throw ex; + } + LOG.debug("Failed to get mob data from file: {} due to error.", pp.toString(), + ex); + continue; + } if (mobRefData == null) { if (bulkloadMarkerData == null) { LOG.warn("Found old store file with no MOB_FILE_REFS: {} - " @@ -264,9 +278,11 @@ public class MobFileCleanerChore extends ScheduledChore { while (rit.hasNext()) { LocatedFileStatus lfs = rit.next(); Path p = lfs.getPath(); - if (!allActiveMobFileName.contains(p.getName())) { - // MOB is not in a list of active references, but it can be too - // fresh, skip it in this case + String[] mobParts = p.getName().split("_"); + String regionName = mobParts[mobParts.length - 1]; + + if (!regionNames.contains(regionName)) { + // MOB belonged to a region no longer hosted long creationTime = fs.getFileStatus(p).getModificationTime(); if (creationTime < maxCreationTimeToArchive) { LOG.trace("Archiving MOB file {} creation time={}", p, @@ -277,7 +293,7 @@ public class MobFileCleanerChore extends ScheduledChore { fs.getFileStatus(p).getModificationTime()); } } else { - LOG.trace("Keeping active MOB file: {}", p); + LOG.trace("Keeping MOB file with existing region: {}", p); } } LOG.info(" MOB Cleaner found {} files to archive for table={} family={}", toArchive.size(), diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobUtils.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobUtils.java index a409c9f42c4..aab589cc13f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobUtils.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobUtils.java @@ -33,6 +33,7 @@ import java.util.Date; import java.util.List; import java.util.Optional; import java.util.UUID; +import java.util.function.Consumer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; @@ -584,6 +585,33 @@ public final class MobUtils { CacheConfig cacheConfig, Encryption.Context cryptoContext, ChecksumType checksumType, int bytesPerChecksum, int blocksize, BloomType bloomType, boolean isCompaction) throws IOException { + return createWriter(conf, fs, family, path, maxKeyCount, compression, cacheConfig, + cryptoContext, checksumType, bytesPerChecksum, blocksize, bloomType, isCompaction, null); + } + + /** + * Creates a writer for the mob file in temp directory. + * @param conf The current configuration. + * @param fs The current file system. + * @param family The descriptor of the current column family. + * @param path The path for a temp directory. + * @param maxKeyCount The key count. + * @param compression The compression algorithm. + * @param cacheConfig The current cache config. + * @param cryptoContext The encryption context. + * @param checksumType The checksum type. + * @param bytesPerChecksum The bytes per checksum. + * @param blocksize The HFile block size. + * @param bloomType The bloom filter type. + * @param isCompaction If the writer is used in compaction. + * @param writerCreationTracker to track the current writer in the store + * @return The writer for the mob file. + */ + public static StoreFileWriter createWriter(Configuration conf, FileSystem fs, + ColumnFamilyDescriptor family, Path path, long maxKeyCount, Compression.Algorithm compression, + CacheConfig cacheConfig, Encryption.Context cryptoContext, ChecksumType checksumType, + int bytesPerChecksum, int blocksize, BloomType bloomType, boolean isCompaction, + Consumer writerCreationTracker) throws IOException { if (compression == null) { compression = HFile.DEFAULT_COMPRESSION_ALGORITHM; } @@ -602,7 +630,8 @@ public final class MobUtils { .withCreateTime(EnvironmentEdgeManager.currentTime()).build(); StoreFileWriter w = new StoreFileWriter.Builder(conf, writerCacheConf, fs).withFilePath(path) - .withBloomType(bloomType).withMaxKeyCount(maxKeyCount).withFileContext(hFileContext).build(); + .withBloomType(bloomType).withMaxKeyCount(maxKeyCount).withFileContext(hFileContext) + .withWriterCreationTracker(writerCreationTracker).build(); return w; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/RSMobFileCleanerChore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/RSMobFileCleanerChore.java new file mode 100644 index 00000000000..06e34988733 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/RSMobFileCleanerChore.java @@ -0,0 +1,272 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mob; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocatedFileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RemoteIterator; +import org.apache.hadoop.hbase.ScheduledChore; +import org.apache.hadoop.hbase.TableDescriptors; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.backup.HFileArchiver; +import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; +import org.apache.hadoop.hbase.client.TableDescriptor; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.regionserver.HStore; +import org.apache.hadoop.hbase.regionserver.HStoreFile; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.CommonFSUtils; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hbase.thirdparty.com.google.common.collect.SetMultimap; + +/** + * The class RSMobFileCleanerChore for running cleaner regularly to remove the obsolete (files which + * have no active references to) mob files that were referenced from the current RS. + */ +@InterfaceAudience.Private +public class RSMobFileCleanerChore extends ScheduledChore { + + private static final Logger LOG = LoggerFactory.getLogger(RSMobFileCleanerChore.class); + private final HRegionServer rs; + + public RSMobFileCleanerChore(HRegionServer rs) { + super(rs.getServerName() + "-MobFileCleanerChore", rs, + rs.getConfiguration().getInt(MobConstants.MOB_CLEANER_PERIOD, + MobConstants.DEFAULT_MOB_CLEANER_PERIOD), + Math.round(rs.getConfiguration().getInt(MobConstants.MOB_CLEANER_PERIOD, + MobConstants.DEFAULT_MOB_CLEANER_PERIOD) + * ((ThreadLocalRandom.current().nextDouble() + 0.5D))), + TimeUnit.SECONDS); + // to prevent a load spike on the fs the initial delay is modified by +/- 50% + this.rs = rs; + } + + public RSMobFileCleanerChore() { + this.rs = null; + } + + @Override + protected void chore() { + + long minAgeToArchive = rs.getConfiguration().getLong(MobConstants.MIN_AGE_TO_ARCHIVE_KEY, + MobConstants.DEFAULT_MIN_AGE_TO_ARCHIVE); + // We check only those MOB files, which creation time is less + // than maxCreationTimeToArchive. This is a current time - 1h. 1 hour gap + // gives us full confidence that all corresponding store files will + // exist at the time cleaning procedure begins and will be examined. + // So, if MOB file creation time is greater than this maxTimeToArchive, + // this will be skipped and won't be archived. + long maxCreationTimeToArchive = EnvironmentEdgeManager.currentTime() - minAgeToArchive; + + TableDescriptors htds = rs.getTableDescriptors(); + try { + FileSystem fs = FileSystem.get(rs.getConfiguration()); + + Map map = null; + try { + map = htds.getAll(); + } catch (IOException e) { + LOG.error("MobFileCleanerChore failed", e); + return; + } + Map>> referencedMOBs = new HashMap<>(); + for (TableDescriptor htd : map.values()) { + // Now clean obsolete files for a table + LOG.info("Cleaning obsolete MOB files from table={}", htd.getTableName()); + List list = MobUtils.getMobColumnFamilies(htd); + List regions = rs.getRegions(htd.getTableName()); + for (HRegion region : regions) { + for (ColumnFamilyDescriptor hcd : list) { + HStore store = region.getStore(hcd.getName()); + Collection sfs = store.getStorefiles(); + Set regionMobs = new HashSet(); + Path currentPath = null; + try { + // collectinng referenced MOBs + for (HStoreFile sf : sfs) { + currentPath = sf.getPath(); + sf.initReader(); + byte[] mobRefData = sf.getMetadataValue(HStoreFile.MOB_FILE_REFS); + byte[] bulkloadMarkerData = sf.getMetadataValue(HStoreFile.BULKLOAD_TASK_KEY); + // close store file to avoid memory leaks + sf.closeStoreFile(true); + if (mobRefData == null) { + if (bulkloadMarkerData == null) { + LOG.warn( + "Found old store file with no MOB_FILE_REFS: {} - " + + "can not proceed until all old files will be MOB-compacted.", + currentPath); + return; + } else { + LOG.debug("Skipping file without MOB references (bulkloaded file):{}", + currentPath); + continue; + } + } + // file may or may not have MOB references, but was created by the distributed + // mob compaction code. + try { + SetMultimap mobs = + MobUtils.deserializeMobFileRefs(mobRefData).build(); + LOG.debug("Found {} mob references for store={}", mobs.size(), sf); + LOG.trace("Specific mob references found for store={} : {}", sf, mobs); + regionMobs.addAll(mobs.values()); + } catch (RuntimeException exception) { + throw new IOException("failure getting mob references for hfile " + sf, + exception); + } + } + // collecting files, MOB included currently being written + regionMobs.addAll(store.getStoreFilesBeingWritten().stream() + .map(path -> path.getName()).collect(Collectors.toList())); + + referencedMOBs + .computeIfAbsent(hcd.getNameAsString(), cf -> new HashMap>()) + .computeIfAbsent(region.getRegionInfo().getEncodedName(), name -> new ArrayList<>()) + .addAll(regionMobs); + + } catch (FileNotFoundException e) { + LOG.warn( + "Missing file:{} Starting MOB cleaning cycle from the beginning" + " due to error", + currentPath, e); + regionMobs.clear(); + continue; + } catch (IOException e) { + LOG.error("Failed to clean the obsolete mob files for table={}", + htd.getTableName().getNameAsString(), e); + } + } + } + + if (LOG.isDebugEnabled()) { + LOG.debug("Found: {} active mob refs for table={}", + referencedMOBs.values().stream().map(inner -> inner.values()) + .flatMap(lists -> lists.stream()).mapToInt(lists -> lists.size()).sum(), + htd.getTableName().getNameAsString()); + } + if (LOG.isTraceEnabled()) { + referencedMOBs.values().stream().forEach(innerMap -> innerMap.values().stream() + .forEach(mobFileList -> mobFileList.stream().forEach(LOG::trace))); + } + + // collect regions referencing MOB files belonging to the current rs + Set regionsCovered = new HashSet<>(); + referencedMOBs.values().stream() + .forEach(regionMap -> regionsCovered.addAll(regionMap.keySet())); + + for (ColumnFamilyDescriptor hcd : list) { + List toArchive = new ArrayList(); + String family = hcd.getNameAsString(); + Path dir = MobUtils.getMobFamilyPath(rs.getConfiguration(), htd.getTableName(), family); + RemoteIterator rit = fs.listLocatedStatus(dir); + while (rit.hasNext()) { + LocatedFileStatus lfs = rit.next(); + Path p = lfs.getPath(); + String[] mobParts = p.getName().split("_"); + String regionName = mobParts[mobParts.length - 1]; + + // skip MOB files not belonging to a region assigned to the current rs + if (!regionsCovered.contains(regionName)) { + LOG.trace("MOB file does not belong to current rs: {}", p); + continue; + } + + // check active or actively written mob files + Map> cfMobs = referencedMOBs.get(hcd.getNameAsString()); + if ( + cfMobs != null && cfMobs.get(regionName) != null + && cfMobs.get(regionName).contains(p.getName()) + ) { + LOG.trace("Keeping active MOB file: {}", p); + continue; + } + + // MOB is not in a list of active references, but it can be too + // fresh, skip it in this case + long creationTime = fs.getFileStatus(p).getModificationTime(); + if (creationTime < maxCreationTimeToArchive) { + LOG.trace("Archiving MOB file {} creation time={}", p, + (fs.getFileStatus(p).getModificationTime())); + toArchive.add(p); + } else { + LOG.trace("Skipping fresh file: {}. Creation time={}", p, + fs.getFileStatus(p).getModificationTime()); + } + + } + LOG.info(" MOB Cleaner found {} files to archive for table={} family={}", + toArchive.size(), htd.getTableName().getNameAsString(), family); + archiveMobFiles(rs.getConfiguration(), htd.getTableName(), family.getBytes(), toArchive); + LOG.info(" MOB Cleaner archived {} files, table={} family={}", toArchive.size(), + htd.getTableName().getNameAsString(), family); + } + + LOG.info("Cleaning obsolete MOB files finished for table={}", htd.getTableName()); + + } + } catch (IOException e) { + LOG.error("MOB Cleaner failed when trying to access the file system", e); + } + } + + /** + * Archives the mob files. + * @param conf The current configuration. + * @param tableName The table name. + * @param family The name of the column family. + * @param storeFiles The files to be archived. + * @throws IOException exception + */ + public void archiveMobFiles(Configuration conf, TableName tableName, byte[] family, + List storeFiles) throws IOException { + + if (storeFiles.size() == 0) { + // nothing to remove + LOG.debug("Skipping archiving old MOB files - no files found for table={} cf={}", tableName, + Bytes.toString(family)); + return; + } + Path mobTableDir = CommonFSUtils.getTableDir(MobUtils.getMobHome(conf), tableName); + FileSystem fs = storeFiles.get(0).getFileSystem(conf); + + for (Path p : storeFiles) { + LOG.debug("MOB Cleaner is archiving: {}", p); + HFileArchiver.archiveStoreFile(conf, fs, MobUtils.getMobRegionInfo(tableName), mobTableDir, + family, p); + } + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java index ac69eb8d324..5992c5941e9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java @@ -28,6 +28,7 @@ import java.util.Optional; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Consumer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -184,7 +185,27 @@ public class HMobStore extends HStore { } Path path = getTempDir(); return createWriterInTmp(MobUtils.formatDate(date), path, maxKeyCount, compression, startKey, - isCompaction); + isCompaction, null); + } + + /** + * Creates the writer for the mob file in the mob family directory. + * @param date The latest date of written cells. + * @param maxKeyCount The key count. + * @param compression The compression algorithm. + * @param startKey The start key. + * @param isCompaction If the writer is used in compaction. + * @return The writer for the mob file. n + */ + public StoreFileWriter createWriter(Date date, long maxKeyCount, + Compression.Algorithm compression, byte[] startKey, boolean isCompaction, + Consumer writerCreationTracker) throws IOException { + if (startKey == null) { + startKey = HConstants.EMPTY_START_ROW; + } + Path path = getPath(); + return createWriterInTmp(MobUtils.formatDate(date), path, maxKeyCount, compression, startKey, + isCompaction, writerCreationTracker); } /** @@ -198,11 +219,13 @@ public class HMobStore extends HStore { * @return The writer for the mob file. n */ public StoreFileWriter createWriterInTmp(String date, Path basePath, long maxKeyCount, - Compression.Algorithm compression, byte[] startKey, boolean isCompaction) throws IOException { + Compression.Algorithm compression, byte[] startKey, boolean isCompaction, + Consumer writerCreationTracker) throws IOException { MobFileName mobFileName = MobFileName.create(startKey, date, UUID.randomUUID().toString().replaceAll("-", ""), getHRegion().getRegionInfo().getEncodedName()); - return createWriterInTmp(mobFileName, basePath, maxKeyCount, compression, isCompaction); + return createWriterInTmp(mobFileName, basePath, maxKeyCount, compression, isCompaction, + writerCreationTracker); } /** @@ -214,13 +237,15 @@ public class HMobStore extends HStore { * @param isCompaction If the writer is used in compaction. * @return The writer for the mob file. n */ + public StoreFileWriter createWriterInTmp(MobFileName mobFileName, Path basePath, long maxKeyCount, - Compression.Algorithm compression, boolean isCompaction) throws IOException { + Compression.Algorithm compression, boolean isCompaction, Consumer writerCreationTracker) + throws IOException { return MobUtils.createWriter(conf, getFileSystem(), getColumnFamilyDescriptor(), new Path(basePath, mobFileName.getFileName()), maxKeyCount, compression, getCacheConfig(), getStoreContext().getEncryptionContext(), StoreUtils.getChecksumType(conf), StoreUtils.getBytesPerChecksum(conf), getStoreContext().getBlockSize(), BloomType.NONE, - isCompaction); + isCompaction, writerCreationTracker); } /** @@ -234,6 +259,10 @@ public class HMobStore extends HStore { } Path dstPath = new Path(targetPath, sourceFile.getName()); validateMobFile(sourceFile); + if (sourceFile.equals(targetPath)) { + LOG.info("File is already in the destination dir: {}", sourceFile); + return; + } LOG.info(" FLUSH Renaming flushed file from {} to {}", sourceFile, dstPath); Path parent = dstPath.getParent(); if (!getFileSystem().exists(parent)) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 268e9822b9e..46193d538d9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -131,6 +131,7 @@ import org.apache.hadoop.hbase.master.HMaster; import org.apache.hadoop.hbase.master.LoadBalancer; import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer; import org.apache.hadoop.hbase.mob.MobFileCache; +import org.apache.hadoop.hbase.mob.RSMobFileCleanerChore; import org.apache.hadoop.hbase.monitoring.TaskMonitor; import org.apache.hadoop.hbase.namequeues.NamedQueueRecorder; import org.apache.hadoop.hbase.namequeues.SlowLogTableOpsChore; @@ -553,6 +554,8 @@ public class HRegionServer extends Thread private BrokenStoreFileCleaner brokenStoreFileCleaner; + private RSMobFileCleanerChore rsMobFileCleanerChore; + @InterfaceAudience.Private CompactedHFilesDischarger compactedFileDischarger; @@ -2181,6 +2184,10 @@ public class HRegionServer extends Thread choreService.scheduleChore(brokenStoreFileCleaner); } + if (this.rsMobFileCleanerChore != null) { + choreService.scheduleChore(rsMobFileCleanerChore); + } + // Leases is not a Thread. Internally it runs a daemon thread. If it gets // an unhandled exception, it will just exit. Threads.setDaemonThreadRunning(this.leaseManager, getName() + ".leaseChecker", @@ -2277,6 +2284,8 @@ public class HRegionServer extends Thread new BrokenStoreFileCleaner((int) (brokenStoreFileCleanerDelay + jitterValue), brokenStoreFileCleanerPeriod, this, conf, this); + this.rsMobFileCleanerChore = new RSMobFileCleanerChore(this); + registerConfigurationObservers(); } @@ -2759,6 +2768,7 @@ public class HRegionServer extends Thread shutdownChore(storefileRefresher); shutdownChore(fsUtilizationChore); shutdownChore(slowLogTableOpsChore); + shutdownChore(rsMobFileCleanerChore); // cancel the remaining scheduled chores (in case we missed out any) // TODO: cancel will not cleanup the chores, so we need make sure we do not miss any choreService.shutdown(); @@ -4022,4 +4032,9 @@ public class HRegionServer extends Thread public BrokenStoreFileCleaner getBrokenStoreFileCleaner() { return brokenStoreFileCleaner; } + + @InterfaceAudience.Private + public RSMobFileCleanerChore getRSMobFileCleanerChore() { + return rsMobFileCleanerChore; + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java index b0dfa92336e..c698cfd8aba 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java @@ -2421,7 +2421,7 @@ public class HStore * {@link BrokenStoreFileCleaner} to prevent deleting the these files as they are not present in * SFT yet. */ - Set getStoreFilesBeingWritten() { + public Set getStoreFilesBeingWritten() { return storeFileWriterCreationTrackers.stream().flatMap(t -> t.get().stream()) .collect(Collectors.toSet()); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java index 0de3eeb024d..566e708b06d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java @@ -361,7 +361,7 @@ public abstract class Compactor { writer = sinkFactory.createWriter(scanner, fd, dropCache, request.isMajor(), request.getWriterCreationTracker()); finished = performCompaction(fd, scanner, writer, smallestReadPoint, cleanSeqId, - throughputController, request.isAllFiles(), request.getFiles().size(), progress); + throughputController, request, progress); if (!finished) { throw new InterruptedIOException("Aborting compaction of store " + store + " in region " + store.getRegionInfo().getRegionNameAsString() + " because it was interrupted."); @@ -400,20 +400,19 @@ public abstract class Compactor { /** * Performs the compaction. - * @param fd FileDetails of cell sink writer - * @param scanner Where to read from. - * @param writer Where to write to. - * @param smallestReadPoint Smallest read point. - * @param cleanSeqId When true, remove seqId(used to be mvcc) value which is <= - * smallestReadPoint - * @param major Is a major compaction. - * @param numofFilesToCompact the number of files to compact - * @param progress Progress reporter. + * @param fd FileDetails of cell sink writer + * @param scanner Where to read from. + * @param writer Where to write to. + * @param smallestReadPoint Smallest read point. + * @param cleanSeqId When true, remove seqId(used to be mvcc) value which is <= + * smallestReadPoint + * @param request compaction request. + * @param progress Progress reporter. * @return Whether compaction ended; false if it was interrupted for some reason. */ protected boolean performCompaction(FileDetails fd, InternalScanner scanner, CellSink writer, long smallestReadPoint, boolean cleanSeqId, ThroughputController throughputController, - boolean major, int numofFilesToCompact, CompactionProgress progress) throws IOException { + CompactionRequestImpl request, CompactionProgress progress) throws IOException { assert writer instanceof ShipperListener; long bytesWrittenProgressForLog = 0; long bytesWrittenProgressForShippedCall = 0; @@ -435,7 +434,7 @@ public abstract class Compactor { throughputController.start(compactionName); KeyValueScanner kvs = (scanner instanceof KeyValueScanner) ? (KeyValueScanner) scanner : null; long shippedCallSizeLimit = - (long) numofFilesToCompact * this.store.getColumnFamilyDescriptor().getBlocksize(); + (long) request.getFiles().size() * this.store.getColumnFamilyDescriptor().getBlocksize(); try { do { hasMore = scanner.next(cells, scannerContext); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/FaultyMobStoreCompactor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/FaultyMobStoreCompactor.java index 324b897d9fd..d37cc605f6b 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/FaultyMobStoreCompactor.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/FaultyMobStoreCompactor.java @@ -42,6 +42,7 @@ import org.apache.hadoop.hbase.regionserver.ShipperListener; import org.apache.hadoop.hbase.regionserver.StoreFileWriter; import org.apache.hadoop.hbase.regionserver.compactions.CloseChecker; import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress; +import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequestImpl; import org.apache.hadoop.hbase.regionserver.throttle.ThroughputControlUtil; import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; import org.apache.hadoop.hbase.util.Bytes; @@ -91,8 +92,9 @@ public class FaultyMobStoreCompactor extends DefaultMobStoreCompactor { @Override protected boolean performCompaction(FileDetails fd, InternalScanner scanner, CellSink writer, long smallestReadPoint, boolean cleanSeqId, ThroughputController throughputController, - boolean major, int numofFilesToCompact, CompactionProgress progress) throws IOException { + CompactionRequestImpl request, CompactionProgress progress) throws IOException { + boolean major = request.isAllFiles(); totalCompactions.incrementAndGet(); if (major) { totalMajorCompactions.incrementAndGet(); @@ -143,7 +145,7 @@ public class FaultyMobStoreCompactor extends DefaultMobStoreCompactor { throughputController.start(compactionName); KeyValueScanner kvs = (scanner instanceof KeyValueScanner) ? (KeyValueScanner) scanner : null; long shippedCallSizeLimit = - (long) numofFilesToCompact * this.store.getColumnFamilyDescriptor().getBlocksize(); + (long) request.getFiles().size() * this.store.getColumnFamilyDescriptor().getBlocksize(); Cell mobCell = null; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/MobTestUtil.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/MobTestUtil.java index 54563e2eb90..2641ed227ae 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/MobTestUtil.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/MobTestUtil.java @@ -18,17 +18,26 @@ package org.apache.hadoop.hbase.mob; import java.io.IOException; +import java.util.Date; import java.util.List; import java.util.Random; import java.util.concurrent.ThreadLocalRandom; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.io.compress.Compression; +import org.apache.hadoop.hbase.io.crypto.Encryption; +import org.apache.hadoop.hbase.io.hfile.CacheConfig; import org.apache.hadoop.hbase.regionserver.StoreFileWriter; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; @@ -112,4 +121,17 @@ public class MobTestUtil { scan.setAttribute(MobConstants.MOB_SCAN_RAW, Bytes.toBytes(Boolean.TRUE)); return util.countRows(table, scan); } + + public static Path generateMOBFileForRegion(Configuration conf, TableName tableName, + ColumnFamilyDescriptor familyDescriptor, String regionName) throws IOException { + Date date = new Date(); + String dateStr = MobUtils.formatDate(date); + FileSystem fs = FileSystem.get(conf); + Path cfMOBDir = MobUtils.getMobFamilyPath(conf, tableName, familyDescriptor.getNameAsString()); + StoreFileWriter writer = MobUtils.createWriter(conf, fs, familyDescriptor, dateStr, cfMOBDir, + 1000L, Compression.Algorithm.NONE, "startKey", CacheConfig.DISABLED, Encryption.Context.NONE, + false, ""); + writer.close(); + return writer.getPath(); + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestDefaultMobStoreFlusher.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestDefaultMobStoreFlusher.java index a07a3b0ab2d..056afaf6fc4 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestDefaultMobStoreFlusher.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestDefaultMobStoreFlusher.java @@ -17,30 +17,37 @@ */ package org.apache.hadoop.hbase.mob; +import java.util.Arrays; +import java.util.Collection; import java.util.List; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseTestingUtility; -import org.apache.hadoop.hbase.HColumnDescriptor; -import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.client.TableDescriptor; +import org.apache.hadoop.hbase.client.TableDescriptorBuilder; +import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory; import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.util.Bytes; -import org.junit.AfterClass; +import org.junit.After; import org.junit.Assert; -import org.junit.BeforeClass; +import org.junit.Before; import org.junit.ClassRule; import org.junit.Rule; import org.junit.Test; import org.junit.experimental.categories.Category; import org.junit.rules.TestName; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +@RunWith(Parameterized.class) @Category(LargeTests.class) public class TestDefaultMobStoreFlusher { @@ -60,41 +67,52 @@ public class TestDefaultMobStoreFlusher { @Rule public TestName name = new TestName(); - @BeforeClass - public static void setUpBeforeClass() throws Exception { + protected Boolean useFileBasedSFT; + + public TestDefaultMobStoreFlusher(Boolean useFileBasedSFT) { + this.useFileBasedSFT = useFileBasedSFT; + } + + @Parameterized.Parameters + public static Collection data() { + Boolean[] data = { false, true }; + return Arrays.asList(data); + } + + @Before + public void setUpBefore() throws Exception { + if (useFileBasedSFT) { + TEST_UTIL.getConfiguration().set(StoreFileTrackerFactory.TRACKER_IMPL, + "org.apache.hadoop.hbase.regionserver.storefiletracker.FileBasedStoreFileTracker"); + } TEST_UTIL.startMiniCluster(1); } - @AfterClass - public static void tearDownAfterClass() throws Exception { + @After + public void tearDownAfter() throws Exception { TEST_UTIL.shutdownMiniCluster(); } @Test public void testFlushNonMobFile() throws Exception { - final TableName tableName = TableName.valueOf(name.getMethodName()); - HTableDescriptor desc = new HTableDescriptor(tableName); - HColumnDescriptor hcd = new HColumnDescriptor(family); - hcd.setMaxVersions(4); - desc.addFamily(hcd); - - testFlushFile(desc); + final TableName tableName = TableName.valueOf(TestMobUtils.getTableName(name)); + TableDescriptor tableDescriptor = TableDescriptorBuilder.newBuilder(tableName) + .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(4).build()) + .build(); + testFlushFile(tableDescriptor); } @Test public void testFlushMobFile() throws Exception { - final TableName tableName = TableName.valueOf(name.getMethodName()); - HTableDescriptor desc = new HTableDescriptor(tableName); - HColumnDescriptor hcd = new HColumnDescriptor(family); - hcd.setMobEnabled(true); - hcd.setMobThreshold(3L); - hcd.setMaxVersions(4); - desc.addFamily(hcd); - - testFlushFile(desc); + final TableName tableName = TableName.valueOf(TestMobUtils.getTableName(name)); + TableDescriptor tableDescriptor = TableDescriptorBuilder.newBuilder(tableName) + .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(family).setMobEnabled(true) + .setMobThreshold(3L).setMaxVersions(4).build()) + .build(); + testFlushFile(tableDescriptor); } - private void testFlushFile(HTableDescriptor htd) throws Exception { + private void testFlushFile(TableDescriptor htd) throws Exception { Table table = null; try { table = TEST_UTIL.createTable(htd, null); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobCompactionBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobCompactionBase.java deleted file mode 100644 index 62ca370197b..00000000000 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobCompactionBase.java +++ /dev/null @@ -1,218 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.mob; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; - -import java.io.IOException; -import java.util.Arrays; -import java.util.Random; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.HBaseTestingUtility; -import org.apache.hadoop.hbase.HColumnDescriptor; -import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.Admin; -import org.apache.hadoop.hbase.client.CompactionState; -import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.client.ResultScanner; -import org.apache.hadoop.hbase.client.Table; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.RegionSplitter; -import org.junit.After; -import org.junit.Before; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Mob file compaction base test. 1. Enables batch mode for regular MOB compaction, Sets batch size - * to 7 regions. (Optional) 2. Disables periodic MOB compactions, sets minimum age to archive to 10 - * sec 3. Creates MOB table with 20 regions 4. Loads MOB data (randomized keys, 1000 rows), flushes - * data. 5. Repeats 4. two more times 6. Verifies that we have 20 *3 = 60 mob files (equals to - * number of regions x 3) 7. Runs major MOB compaction. 8. Verifies that number of MOB files in a - * mob directory is 20 x4 = 80 9. Waits for a period of time larger than minimum age to archive 10. - * Runs Mob cleaner chore 11 Verifies that number of MOB files in a mob directory is 20. 12 Runs - * scanner and checks all 3 * 1000 rows. - */ -@SuppressWarnings("deprecation") -public abstract class TestMobCompactionBase { - private static final Logger LOG = LoggerFactory.getLogger(TestMobCompactionBase.class); - - protected HBaseTestingUtility HTU; - - protected final static String famStr = "f1"; - protected final static byte[] fam = Bytes.toBytes(famStr); - protected final static byte[] qualifier = Bytes.toBytes("q1"); - protected final static long mobLen = 10; - protected final static byte[] mobVal = Bytes - .toBytes("01234567890123456789012345678901234567890123456789012345678901234567890123456789"); - - protected Configuration conf; - protected HTableDescriptor hdt; - private HColumnDescriptor hcd; - protected Admin admin; - protected Table table = null; - protected long minAgeToArchive = 10000; - protected int numRegions = 20; - protected int rows = 1000; - - protected MobFileCleanerChore cleanerChore; - - public TestMobCompactionBase() { - } - - @Before - public void setUp() throws Exception { - HTU = new HBaseTestingUtility(); - hdt = HTU.createTableDescriptor(getClass().getName()); - conf = HTU.getConfiguration(); - - initConf(); - - HTU.startMiniCluster(); - admin = HTU.getAdmin(); - cleanerChore = new MobFileCleanerChore(); - hcd = new HColumnDescriptor(fam); - hcd.setMobEnabled(true); - hcd.setMobThreshold(mobLen); - hcd.setMaxVersions(1); - hdt.addFamily(hcd); - RegionSplitter.UniformSplit splitAlgo = new RegionSplitter.UniformSplit(); - byte[][] splitKeys = splitAlgo.split(numRegions); - table = HTU.createTable(hdt, splitKeys); - - } - - protected void initConf() { - - conf.setInt("hfile.format.version", 3); - // Disable automatic MOB compaction - conf.setLong(MobConstants.MOB_COMPACTION_CHORE_PERIOD, 0); - // Disable automatic MOB file cleaner chore - conf.setLong(MobConstants.MOB_CLEANER_PERIOD, 0); - // Set minimum age to archive to 10 sec - conf.setLong(MobConstants.MIN_AGE_TO_ARCHIVE_KEY, minAgeToArchive); - // Set compacted file discharger interval to a half minAgeToArchive - conf.setLong("hbase.hfile.compaction.discharger.interval", minAgeToArchive / 2); - } - - private void loadData(int num) { - - Random r = new Random(); - try { - LOG.info("Started loading {} rows", num); - for (int i = 0; i < num; i++) { - byte[] key = new byte[32]; - r.nextBytes(key); - Put p = new Put(key); - p.addColumn(fam, qualifier, mobVal); - table.put(p); - } - admin.flush(table.getName()); - LOG.info("Finished loading {} rows", num); - } catch (Exception e) { - LOG.error("MOB file compaction chore test FAILED", e); - fail("MOB file compaction chore test FAILED"); - } - } - - @After - public void tearDown() throws Exception { - admin.disableTable(hdt.getTableName()); - admin.deleteTable(hdt.getTableName()); - HTU.shutdownMiniCluster(); - } - - public void baseTestMobFileCompaction() throws InterruptedException, IOException { - - // Load and flush data 3 times - loadData(rows); - loadData(rows); - loadData(rows); - long num = getNumberOfMobFiles(conf, table.getName(), new String(fam)); - assertEquals(numRegions * 3, num); - // Major MOB compact - mobCompact(admin, hdt, hcd); - // wait until compaction is complete - while (admin.getCompactionState(hdt.getTableName()) != CompactionState.NONE) { - Thread.sleep(100); - } - - num = getNumberOfMobFiles(conf, table.getName(), new String(fam)); - assertEquals(numRegions * 4, num); - // We have guarantee, that compacted file discharger will run during this pause - // because it has interval less than this wait time - LOG.info("Waiting for {}ms", minAgeToArchive + 1000); - - Thread.sleep(minAgeToArchive + 1000); - LOG.info("Cleaning up MOB files"); - // Cleanup again - cleanerChore.cleanupObsoleteMobFiles(conf, table.getName()); - - num = getNumberOfMobFiles(conf, table.getName(), new String(fam)); - assertEquals(numRegions, num); - - long scanned = scanTable(); - assertEquals(3 * rows, scanned); - - } - - protected abstract void mobCompact(Admin admin2, HTableDescriptor hdt2, HColumnDescriptor hcd2) - throws IOException, InterruptedException; - - protected long getNumberOfMobFiles(Configuration conf, TableName tableName, String family) - throws IOException { - FileSystem fs = FileSystem.get(conf); - Path dir = MobUtils.getMobFamilyPath(conf, tableName, family); - FileStatus[] stat = fs.listStatus(dir); - for (FileStatus st : stat) { - LOG.debug("MOB Directory content: {}", st.getPath()); - } - LOG.debug("MOB Directory content total files: {}", stat.length); - - return stat.length; - } - - protected long scanTable() { - try { - - Result result; - ResultScanner scanner = table.getScanner(fam); - long counter = 0; - while ((result = scanner.next()) != null) { - assertTrue(Arrays.equals(result.getValue(fam, qualifier), mobVal)); - counter++; - } - return counter; - } catch (Exception e) { - LOG.error("MOB file compaction test FAILED", e); - if (HTU != null) { - fail(e.getMessage()); - } else { - System.exit(-1); - } - } - return 0; - } -} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobCompactionOptMode.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobCompactionOptMode.java index 47fcde9a233..816ea29b911 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobCompactionOptMode.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobCompactionOptMode.java @@ -17,10 +17,8 @@ */ package org.apache.hadoop.hbase.mob; -import java.io.IOException; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.testclassification.LargeTests; -import org.junit.BeforeClass; import org.junit.ClassRule; import org.junit.experimental.categories.Category; @@ -40,12 +38,13 @@ public class TestMobCompactionOptMode extends TestMobCompactionWithDefaults { public static final HBaseClassTestRule CLASS_RULE = HBaseClassTestRule.forClass(TestMobCompactionOptMode.class); - @BeforeClass - public static void configureOptimizedCompaction() throws InterruptedException, IOException { - HTU.shutdownMiniHBaseCluster(); + public TestMobCompactionOptMode(Boolean useFileBasedSFT) { + super(useFileBasedSFT); + } + + protected void additonalConfigSetup() { conf.set(MobConstants.MOB_COMPACTION_TYPE_KEY, MobConstants.OPTIMIZED_MOB_COMPACTION_TYPE); conf.setLong(MobConstants.MOB_COMPACTION_MAX_FILE_SIZE_KEY, 1000000); - HTU.startMiniHBaseCluster(); } @Override diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobCompactionOptRegionBatchMode.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobCompactionOptRegionBatchMode.java index 7b6b44d0e31..dfd6435d364 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobCompactionOptRegionBatchMode.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobCompactionOptRegionBatchMode.java @@ -23,9 +23,10 @@ import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; import org.apache.hadoop.hbase.client.TableDescriptor; import org.apache.hadoop.hbase.testclassification.LargeTests; import org.junit.Before; -import org.junit.BeforeClass; import org.junit.ClassRule; import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -39,6 +40,7 @@ import org.slf4j.LoggerFactory; * time larger than minimum age to archive 10. Runs Mob cleaner chore 11 Verifies that number of MOB * files in a mob directory is 20. 12 Runs scanner and checks all 3 * 1000 rows. */ +@RunWith(Parameterized.class) @Category(LargeTests.class) public class TestMobCompactionOptRegionBatchMode extends TestMobCompactionWithDefaults { private static final Logger LOG = @@ -50,20 +52,20 @@ public class TestMobCompactionOptRegionBatchMode extends TestMobCompactionWithDe private static final int batchSize = 7; private MobFileCompactionChore compactionChore; + public TestMobCompactionOptRegionBatchMode(Boolean useFileBasedSFT) { + super(useFileBasedSFT); + } + @Before public void setUp() throws Exception { super.setUp(); compactionChore = new MobFileCompactionChore(conf, batchSize); } - @BeforeClass - public static void configureOptimizedCompactionAndBatches() - throws InterruptedException, IOException { - HTU.shutdownMiniHBaseCluster(); + protected void additonalConfigSetup() { conf.setInt(MobConstants.MOB_MAJOR_COMPACTION_REGION_BATCH_SIZE, batchSize); conf.set(MobConstants.MOB_COMPACTION_TYPE_KEY, MobConstants.OPTIMIZED_MOB_COMPACTION_TYPE); conf.setLong(MobConstants.MOB_COMPACTION_MAX_FILE_SIZE_KEY, 1000000); - HTU.startMiniHBaseCluster(); } @Override diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobCompactionRegularMode.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobCompactionRegularMode.java deleted file mode 100644 index 92809ea5511..00000000000 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobCompactionRegularMode.java +++ /dev/null @@ -1,68 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.mob; - -import java.io.IOException; -import org.apache.hadoop.hbase.HBaseClassTestRule; -import org.apache.hadoop.hbase.HColumnDescriptor; -import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.client.Admin; -import org.apache.hadoop.hbase.testclassification.LargeTests; -import org.junit.ClassRule; -import org.junit.Test; -import org.junit.experimental.categories.Category; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Mob file compaction chore in a regular non-batch mode test. 1. Uses default (non-batch) mode for - * regular MOB compaction, 2. Disables periodic MOB compactions, sets minimum age to archive to 10 - * sec 3. Creates MOB table with 20 regions 4. Loads MOB data (randomized keys, 1000 rows), flushes - * data. 5. Repeats 4. two more times 6. Verifies that we have 20 *3 = 60 mob files (equals to - * number of regions x 3) 7. Runs major MOB compaction. 8. Verifies that number of MOB files in a - * mob directory is 20 x4 = 80 9. Waits for a period of time larger than minimum age to archive 10. - * Runs Mob cleaner chore 11 Verifies that number of MOB files in a mob directory is 20. 12 Runs - * scanner and checks all 3 * 1000 rows. - */ -@SuppressWarnings("deprecation") -@Category(LargeTests.class) -public class TestMobCompactionRegularMode extends TestMobCompactionBase { - private static final Logger LOG = LoggerFactory.getLogger(TestMobCompactionRegularMode.class); - @ClassRule - public static final HBaseClassTestRule CLASS_RULE = - HBaseClassTestRule.forClass(TestMobCompactionRegularMode.class); - - public TestMobCompactionRegularMode() { - } - - @Test - public void testMobFileCompactionBatchMode() throws InterruptedException, IOException { - LOG.info("MOB compaction regular mode started"); - baseTestMobFileCompaction(); - LOG.info("MOB compaction regular mode finished OK"); - - } - - @Override - protected void mobCompact(Admin admin, HTableDescriptor hdt, HColumnDescriptor hcd) - throws IOException, InterruptedException { - // Major compact MOB table - admin.majorCompact(hdt.getTableName(), hcd.getName()); - } - -} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobCompactionRegularRegionBatchMode.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobCompactionRegularRegionBatchMode.java index 3d6eaa0a25a..5e2806bdee2 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobCompactionRegularRegionBatchMode.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobCompactionRegularRegionBatchMode.java @@ -23,9 +23,10 @@ import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; import org.apache.hadoop.hbase.client.TableDescriptor; import org.apache.hadoop.hbase.testclassification.LargeTests; import org.junit.Before; -import org.junit.BeforeClass; import org.junit.ClassRule; import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -39,6 +40,7 @@ import org.slf4j.LoggerFactory; * to archive 10. Runs Mob cleaner chore 11 Verifies that number of MOB files in a mob directory is * 20. 12 Runs scanner and checks all 3 * 1000 rows. */ +@RunWith(Parameterized.class) @Category(LargeTests.class) public class TestMobCompactionRegularRegionBatchMode extends TestMobCompactionWithDefaults { private static final Logger LOG = @@ -50,17 +52,18 @@ public class TestMobCompactionRegularRegionBatchMode extends TestMobCompactionWi private static final int batchSize = 7; private MobFileCompactionChore compactionChore; + public TestMobCompactionRegularRegionBatchMode(Boolean useFileBasedSFT) { + super(useFileBasedSFT); + } + @Before public void setUp() throws Exception { super.setUp(); compactionChore = new MobFileCompactionChore(conf, batchSize); } - @BeforeClass - public static void configureCompactionBatches() throws InterruptedException, IOException { - HTU.shutdownMiniHBaseCluster(); + protected void additonalConfigSetup() { conf.setInt(MobConstants.MOB_MAJOR_COMPACTION_REGION_BATCH_SIZE, batchSize); - HTU.startMiniHBaseCluster(); } @Override diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobCompactionWithDefaults.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobCompactionWithDefaults.java index 23e7c233496..df3eb29525b 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobCompactionWithDefaults.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobCompactionWithDefaults.java @@ -23,6 +23,7 @@ import static org.junit.Assert.fail; import java.io.IOException; import java.util.Arrays; +import java.util.Collection; import java.util.List; import java.util.Random; import java.util.stream.Collectors; @@ -32,6 +33,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; @@ -43,18 +45,19 @@ import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.client.TableDescriptor; import org.apache.hadoop.hbase.client.TableDescriptorBuilder; +import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory; import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.RegionSplitter; import org.junit.After; -import org.junit.AfterClass; import org.junit.Before; -import org.junit.BeforeClass; import org.junit.ClassRule; import org.junit.Rule; import org.junit.Test; import org.junit.experimental.categories.Category; import org.junit.rules.TestName; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -68,6 +71,7 @@ import org.slf4j.LoggerFactory; * Runs Mob cleaner chore 11 Verifies that number of MOB files in a mob directory is 20. 12 Runs * scanner and checks all 3 * 1000 rows. */ +@RunWith(Parameterized.class) @Category(LargeTests.class) public class TestMobCompactionWithDefaults { private static final Logger LOG = LoggerFactory.getLogger(TestMobCompactionWithDefaults.class); @@ -97,8 +101,19 @@ public class TestMobCompactionWithDefaults { protected MobFileCleanerChore cleanerChore; - @BeforeClass - public static void htuStart() throws Exception { + protected Boolean useFileBasedSFT; + + public TestMobCompactionWithDefaults(Boolean useFileBasedSFT) { + this.useFileBasedSFT = useFileBasedSFT; + } + + @Parameterized.Parameters + public static Collection data() { + Boolean[] data = { false, true }; + return Arrays.asList(data); + } + + protected void htuStart() throws Exception { HTU = new HBaseTestingUtility(); conf = HTU.getConfiguration(); conf.setInt("hfile.format.version", 3); @@ -111,17 +126,21 @@ public class TestMobCompactionWithDefaults { // Set compacted file discharger interval to a half minAgeToArchive conf.setLong("hbase.hfile.compaction.discharger.interval", minAgeToArchive / 2); conf.setBoolean("hbase.regionserver.compaction.enabled", false); + if (useFileBasedSFT) { + conf.set(StoreFileTrackerFactory.TRACKER_IMPL, + "org.apache.hadoop.hbase.regionserver.storefiletracker.FileBasedStoreFileTracker"); + } + additonalConfigSetup(); HTU.startMiniCluster(); } - @AfterClass - public static void htuStop() throws Exception { - HTU.shutdownMiniCluster(); + protected void additonalConfigSetup() { } @Before public void setUp() throws Exception { - tableDescriptor = HTU.createModifyableTableDescriptor(test.getMethodName()); + htuStart(); + tableDescriptor = HTU.createModifyableTableDescriptor(TestMobUtils.getTableName(test)); admin = HTU.getAdmin(); cleanerChore = new MobFileCleanerChore(); familyDescriptor = new ColumnFamilyDescriptorBuilder.ModifyableColumnFamilyDescriptor(fam); @@ -158,6 +177,7 @@ public class TestMobCompactionWithDefaults { public void tearDown() throws Exception { admin.disableTable(tableDescriptor.getTableName()); admin.deleteTable(tableDescriptor.getTableName()); + HTU.shutdownMiniCluster(); } @Test @@ -173,12 +193,12 @@ public class TestMobCompactionWithDefaults { @Test public void testMobFileCompactionAfterSnapshotClone() throws InterruptedException, IOException { - final TableName clone = TableName.valueOf(test.getMethodName() + "-clone"); + final TableName clone = TableName.valueOf(TestMobUtils.getTableName(test) + "-clone"); LOG.info("MOB compaction of cloned snapshot, " + description() + " started"); loadAndFlushThreeTimes(rows, table, famStr); LOG.debug("Taking snapshot and cloning table {}", table); - admin.snapshot(test.getMethodName(), table); - admin.cloneSnapshot(test.getMethodName(), clone); + admin.snapshot(TestMobUtils.getTableName(test), table); + admin.cloneSnapshot(TestMobUtils.getTableName(test), clone); assertEquals("Should have 3 hlinks per region in MOB area from snapshot clone", 3 * numRegions, getNumberOfMobFiles(clone, famStr)); mobCompact(admin.getDescriptor(clone), familyDescriptor); @@ -191,12 +211,12 @@ public class TestMobCompactionWithDefaults { @Test public void testMobFileCompactionAfterSnapshotCloneAndFlush() throws InterruptedException, IOException { - final TableName clone = TableName.valueOf(test.getMethodName() + "-clone"); + final TableName clone = TableName.valueOf(TestMobUtils.getTableName(test) + "-clone"); LOG.info("MOB compaction of cloned snapshot after flush, " + description() + " started"); loadAndFlushThreeTimes(rows, table, famStr); LOG.debug("Taking snapshot and cloning table {}", table); - admin.snapshot(test.getMethodName(), table); - admin.cloneSnapshot(test.getMethodName(), clone); + admin.snapshot(TestMobUtils.getTableName(test), table); + admin.cloneSnapshot(TestMobUtils.getTableName(test), clone); assertEquals("Should have 3 hlinks per region in MOB area from snapshot clone", 3 * numRegions, getNumberOfMobFiles(clone, famStr)); loadAndFlushThreeTimes(rows, clone, famStr); @@ -275,8 +295,11 @@ public class TestMobCompactionWithDefaults { Thread.sleep(minAgeToArchive + 1000); LOG.info("Cleaning up MOB files"); - // Cleanup again - cleanerChore.cleanupObsoleteMobFiles(conf, table); + + // run cleaner chore on each RS + for (ServerName sn : admin.getRegionServers()) { + HTU.getMiniHBaseCluster().getRegionServer(sn).getRSMobFileCleanerChore().chore(); + } assertEquals("After cleaning, we should have 1 MOB file per region based on size.", numRegions, getNumberOfMobFiles(table, family)); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobFileCleanerChore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobFileCleanerChore.java index bac0408eb4a..6ce127b994a 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobFileCleanerChore.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobFileCleanerChore.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hbase.mob; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import java.io.IOException; @@ -32,6 +33,8 @@ import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; +import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; import org.apache.hadoop.hbase.client.CompactionState; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; @@ -168,14 +171,40 @@ public class TestMobFileCleanerChore { Thread.sleep(minAgeToArchive + 1000); LOG.info("Cleaning up MOB files"); - // Cleanup again + // Cleanup chore.cleanupObsoleteMobFiles(conf, table.getName()); + // verify that nothing have happened num = getNumberOfMobFiles(conf, table.getName(), new String(fam)); - assertEquals(1, num); + assertEquals(4, num); long scanned = scanTable(); assertEquals(30, scanned); + + // add a MOB file to with a name refering to a non-existing region + ColumnFamilyDescriptor familyDescriptor = ColumnFamilyDescriptorBuilder.newBuilder(fam) + .setMobEnabled(true).setMobThreshold(mobLen).setMaxVersions(1).build(); + Path extraMOBFile = MobTestUtil.generateMOBFileForRegion(conf, table.getName(), + familyDescriptor, "nonExistentRegion"); + num = getNumberOfMobFiles(conf, table.getName(), new String(fam)); + assertEquals(5, num); + + LOG.info("Waiting for {}ms", minAgeToArchive + 1000); + + Thread.sleep(minAgeToArchive + 1000); + LOG.info("Cleaning up MOB files"); + chore.cleanupObsoleteMobFiles(conf, table.getName()); + + // check that the extra file got deleted + num = getNumberOfMobFiles(conf, table.getName(), new String(fam)); + assertEquals(4, num); + + FileSystem fs = FileSystem.get(conf); + assertFalse(fs.exists(extraMOBFile)); + + scanned = scanTable(); + assertEquals(30, scanned); + } private long getNumberOfMobFiles(Configuration conf, TableName tableName, String family) diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobStoreCompaction.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobStoreCompaction.java index 90b3d4085bf..123965c0eca 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobStoreCompaction.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobStoreCompaction.java @@ -26,11 +26,14 @@ import static org.junit.Assert.assertTrue; import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Set; +import java.util.UUID; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; @@ -65,6 +68,7 @@ import org.apache.hadoop.hbase.regionserver.InternalScanner; import org.apache.hadoop.hbase.regionserver.RegionAsTable; import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker; +import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory; import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.testclassification.MediumTests; @@ -78,12 +82,15 @@ import org.junit.Rule; import org.junit.Test; import org.junit.experimental.categories.Category; import org.junit.rules.TestName; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * Test mob store compaction */ +@RunWith(Parameterized.class) @Category(MediumTests.class) public class TestMobStoreCompaction { @@ -108,13 +115,30 @@ public class TestMobStoreCompaction { private final byte[] STARTROW = Bytes.toBytes(START_KEY); private int compactionThreshold; + private Boolean useFileBasedSFT; + + public TestMobStoreCompaction(Boolean useFileBasedSFT) { + this.useFileBasedSFT = useFileBasedSFT; + } + + @Parameterized.Parameters + public static Collection data() { + Boolean[] data = { false, true }; + return Arrays.asList(data); + } + private void init(Configuration conf, long mobThreshold) throws Exception { + if (useFileBasedSFT) { + conf.set(StoreFileTrackerFactory.TRACKER_IMPL, + "org.apache.hadoop.hbase.regionserver.storefiletracker.FileBasedStoreFileTracker"); + } + this.conf = conf; this.mobCellThreshold = mobThreshold; HBaseTestingUtility UTIL = new HBaseTestingUtility(conf); compactionThreshold = conf.getInt("hbase.hstore.compactionThreshold", 3); - htd = UTIL.createTableDescriptor(name.getMethodName()); + htd = UTIL.createTableDescriptor(TestMobUtils.getTableName(name)); hcd = new HColumnDescriptor(COLUMN_FAMILY); hcd.setMobEnabled(true); hcd.setMobThreshold(mobThreshold); @@ -227,7 +251,7 @@ public class TestMobStoreCompaction { Path basedir = new Path(hbaseRootDir, htd.getNameAsString()); List> hfiles = new ArrayList<>(1); for (int i = 0; i < compactionThreshold; i++) { - Path hpath = new Path(basedir, "hfile" + i); + Path hpath = new Path(basedir, UUID.randomUUID().toString().replace("-", "")); hfiles.add(Pair.newPair(COLUMN_FAMILY, hpath.toString())); createHFile(hpath, i, dummyData); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobUtils.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobUtils.java index 247dfff77d1..32170e98b35 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobUtils.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobUtils.java @@ -27,6 +27,7 @@ import org.apache.hadoop.hbase.util.Bytes; import org.junit.ClassRule; import org.junit.Test; import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableSet; import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableSetMultimap; @@ -89,4 +90,8 @@ public class TestMobUtils { assertTrue(testTable3Refs.contains("file3a")); assertTrue(testTable3Refs.contains("file3b")); } + + public static String getTableName(TestName test) { + return test.getMethodName().replace("[", "-").replace("]", ""); + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestRSMobFileCleanerChore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestRSMobFileCleanerChore.java new file mode 100644 index 00000000000..98187631d96 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestRSMobFileCleanerChore.java @@ -0,0 +1,263 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mob; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.stream.Collectors; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; +import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; +import org.apache.hadoop.hbase.client.CompactionState; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.RegionInfo; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.client.TableDescriptorBuilder; +import org.apache.hadoop.hbase.master.cleaner.TimeToLiveHFileCleaner; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.After; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Mob file cleaner chore test. 1. Creates MOB table 2. Load MOB data and flushes it N times 3. Runs + * major MOB compaction 4. Verifies that number of MOB files in a mob directory is N+1 5. Waits for + * a period of time larger than minimum age to archive 6. Runs Mob cleaner chore 7 Verifies that + * every old MOB file referenced from current RS was archived + */ +@Category(MediumTests.class) +public class TestRSMobFileCleanerChore { + private static final Logger LOG = LoggerFactory.getLogger(TestRSMobFileCleanerChore.class); + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestRSMobFileCleanerChore.class); + + private HBaseTestingUtility HTU; + + private final static String famStr = "f1"; + private final static byte[] fam = Bytes.toBytes(famStr); + private final static byte[] qualifier = Bytes.toBytes("q1"); + private final static long mobLen = 10; + private final static byte[] mobVal = Bytes + .toBytes("01234567890123456789012345678901234567890123456789012345678901234567890123456789"); + + private Configuration conf; + private TableDescriptorBuilder.ModifyableTableDescriptor tableDescriptor; + private ColumnFamilyDescriptor familyDescriptor; + private Admin admin; + private Table table = null; + private RSMobFileCleanerChore chore; + private long minAgeToArchive = 10000; + + public TestRSMobFileCleanerChore() { + } + + @Before + public void setUp() throws Exception { + HTU = new HBaseTestingUtility(); + conf = HTU.getConfiguration(); + + initConf(); + + HTU.startMiniCluster(); + admin = HTU.getAdmin(); + familyDescriptor = ColumnFamilyDescriptorBuilder.newBuilder(fam).setMobEnabled(true) + .setMobThreshold(mobLen).setMaxVersions(1).build(); + tableDescriptor = + HTU.createModifyableTableDescriptor("testMobCompactTable").setColumnFamily(familyDescriptor); + table = HTU.createTable(tableDescriptor, Bytes.toByteArrays("1")); + } + + private void initConf() { + + conf.setInt("hfile.format.version", 3); + conf.setLong(TimeToLiveHFileCleaner.TTL_CONF_KEY, 0); + conf.setInt("hbase.client.retries.number", 100); + conf.setInt("hbase.hregion.max.filesize", 200000000); + conf.setInt("hbase.hregion.memstore.flush.size", 800000); + conf.setInt("hbase.hstore.blockingStoreFiles", 150); + conf.setInt("hbase.hstore.compaction.throughput.lower.bound", 52428800); + conf.setInt("hbase.hstore.compaction.throughput.higher.bound", 2 * 52428800); + // conf.set(MobStoreEngine.DEFAULT_MOB_COMPACTOR_CLASS_KEY, + // FaultyMobStoreCompactor.class.getName()); + // Disable automatic MOB compaction + conf.setLong(MobConstants.MOB_COMPACTION_CHORE_PERIOD, 0); + // Disable automatic MOB file cleaner chore + conf.setLong(MobConstants.MOB_CLEANER_PERIOD, 0); + // Set minimum age to archive to 10 sec + conf.setLong(MobConstants.MIN_AGE_TO_ARCHIVE_KEY, minAgeToArchive); + // Set compacted file discharger interval to a half minAgeToArchive + conf.setLong("hbase.hfile.compaction.discharger.interval", minAgeToArchive / 2); + } + + private void loadData(int start, int num) { + try { + + for (int i = 0; i < num; i++) { + Put p = new Put(Bytes.toBytes(start + i)); + p.addColumn(fam, qualifier, mobVal); + table.put(p); + } + admin.flush(table.getName()); + } catch (Exception e) { + LOG.error("MOB file cleaner chore test FAILED", e); + assertTrue(false); + } + } + + @After + public void tearDown() throws Exception { + admin.disableTable(tableDescriptor.getTableName()); + admin.deleteTable(tableDescriptor.getTableName()); + HTU.shutdownMiniCluster(); + } + + @Test + public void testMobFileCleanerChore() throws InterruptedException, IOException { + loadData(0, 10); + loadData(10, 10); + // loadData(20, 10); + long num = getNumberOfMobFiles(conf, table.getName(), new String(fam)); + assertEquals(2, num); + // Major compact + admin.majorCompact(tableDescriptor.getTableName(), fam); + // wait until compaction is complete + while (admin.getCompactionState(tableDescriptor.getTableName()) != CompactionState.NONE) { + Thread.sleep(100); + } + + num = getNumberOfMobFiles(conf, table.getName(), new String(fam)); + assertEquals(3, num); + // We have guarantee, that compcated file discharger will run during this pause + // because it has interval less than this wait time + LOG.info("Waiting for {}ms", minAgeToArchive + 1000); + + Thread.sleep(minAgeToArchive + 1000); + LOG.info("Cleaning up MOB files"); + + ServerName serverUsed = null; + List serverRegions = null; + for (ServerName sn : admin.getRegionServers()) { + serverRegions = admin.getRegions(sn); + if (serverRegions != null && serverRegions.size() > 0) { + // filtering out non test table regions + serverRegions = serverRegions.stream().filter(r -> r.getTable() == table.getName()) + .collect(Collectors.toList()); + // if such one is found use this rs + if (serverRegions.size() > 0) { + serverUsed = sn; + } + break; + } + } + + chore = HTU.getMiniHBaseCluster().getRegionServer(serverUsed).getRSMobFileCleanerChore(); + + chore.chore(); + + num = getNumberOfMobFiles(conf, table.getName(), new String(fam)); + assertEquals(3 - serverRegions.size(), num); + + long scanned = scanTable(); + assertEquals(20, scanned); + + // creating a MOB file not referenced from the current RS + Path extraMOBFile = MobTestUtil.generateMOBFileForRegion(conf, table.getName(), + familyDescriptor, "nonExistentRegion"); + + // verifying the new MOBfile is added + num = getNumberOfMobFiles(conf, table.getName(), new String(fam)); + assertEquals(4 - serverRegions.size(), num); + + FileSystem fs = FileSystem.get(conf); + assertTrue(fs.exists(extraMOBFile)); + + LOG.info("Waiting for {}ms", minAgeToArchive + 1000); + + Thread.sleep(minAgeToArchive + 1000); + LOG.info("Cleaning up MOB files"); + + // running chore again + chore.chore(); + + // the chore should only archive old MOB files that were referenced from the current RS + // the unrelated MOB file is still there + num = getNumberOfMobFiles(conf, table.getName(), new String(fam)); + assertEquals(4 - serverRegions.size(), num); + + assertTrue(fs.exists(extraMOBFile)); + + scanned = scanTable(); + assertEquals(20, scanned); + } + + private long getNumberOfMobFiles(Configuration conf, TableName tableName, String family) + throws IOException { + FileSystem fs = FileSystem.get(conf); + Path dir = MobUtils.getMobFamilyPath(conf, tableName, family); + FileStatus[] stat = fs.listStatus(dir); + for (FileStatus st : stat) { + LOG.debug("DDDD MOB Directory content: {} size={}", st.getPath(), st.getLen()); + } + LOG.debug("MOB Directory content total files: {}", stat.length); + + return stat.length; + } + + private long scanTable() { + try { + + Result result; + ResultScanner scanner = table.getScanner(fam); + long counter = 0; + while ((result = scanner.next()) != null) { + assertTrue(Arrays.equals(result.getValue(fam, qualifier), mobVal)); + counter++; + } + return counter; + } catch (Exception e) { + e.printStackTrace(); + LOG.error("MOB file cleaner chore test FAILED"); + if (HTU != null) { + assertTrue(false); + } else { + System.exit(-1); + } + } + return 0; + } +}